risingwave_frontend/catalog/
schema_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry::{Occupied, Vacant};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25    PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26    PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, OwnedGrantObject, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47    id: SchemaId,
48    pub name: String,
49    pub database_id: DatabaseId,
50    /// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
51    table_by_name: HashMap<String, Arc<TableCatalog>>,
52    /// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
53    table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54    source_by_name: HashMap<String, Arc<SourceCatalog>>,
55    source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56    sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57    sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58    subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
59    subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
60    index_by_name: HashMap<String, Arc<IndexCatalog>>,
61    index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
62    indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
63    view_by_name: HashMap<String, Arc<ViewCatalog>>,
64    view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
65    function_registry: FunctionRegistry,
66    function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
67    function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
68    connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
69    connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
70    secret_by_name: HashMap<String, Arc<SecretCatalog>>,
71    secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
72
73    _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
74    _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
75
76    // This field is currently used only for `show connections`
77    connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
78    // This field is currently used only for `show connections`
79    connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
80    // This field only available when schema is "pg_catalog". Meanwhile, others will be empty.
81    system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
82    pub owner: u32,
83}
84
85impl SchemaCatalog {
86    pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
87        let name = prost.name.clone();
88        let id = prost.id.into();
89        let table: TableCatalog = prost.into();
90        let table_ref = Arc::new(table);
91
92        self.table_by_name
93            .try_insert(name, table_ref.clone())
94            .unwrap();
95        self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
96        table_ref
97    }
98
99    pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
100        self.system_table_by_name
101            .try_insert(sys_table.name.clone(), sys_table)
102            .unwrap();
103    }
104
105    pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
106        self.view_by_name
107            .try_insert(sys_view.name().to_owned(), sys_view.clone())
108            .unwrap();
109        self.view_by_id
110            .try_insert(sys_view.id, sys_view.clone())
111            .unwrap();
112    }
113
114    pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
115        let name = prost.name.clone();
116        let id = prost.id.into();
117        let table: TableCatalog = prost.into();
118        let table_ref = Arc::new(table);
119
120        let old_table = self.table_by_id.get(&id).unwrap();
121        // check if the table name gets updated.
122        if old_table.name() != name
123            && let Some(t) = self.table_by_name.get(old_table.name())
124            && t.id == id
125        {
126            self.table_by_name.remove(old_table.name());
127        }
128
129        self.table_by_name.insert(name, table_ref.clone());
130        self.table_by_id.insert(id, table_ref.clone());
131        table_ref
132    }
133
134    pub fn update_index(&mut self, prost: &PbIndex) {
135        let name = prost.name.clone();
136        let id = prost.id.into();
137        let old_index = self.index_by_id.get(&id).unwrap();
138        let index_table = self
139            .get_created_table_by_id(&prost.index_table_id.into())
140            .unwrap();
141        let primary_table = self
142            .get_created_table_by_id(&prost.primary_table_id.into())
143            .unwrap();
144        let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
145        let index_ref = Arc::new(index);
146
147        // check if the index name gets updated.
148        if old_index.name != name
149            && let Some(idx) = self.index_by_name.get(&old_index.name)
150            && idx.id == id
151        {
152            self.index_by_name.remove(&old_index.name);
153        }
154        self.index_by_name.insert(name, index_ref.clone());
155        self.index_by_id.insert(id, index_ref.clone());
156
157        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
158            Occupied(mut entry) => {
159                let pos = entry
160                    .get()
161                    .iter()
162                    .position(|x| x.id == index_ref.id)
163                    .unwrap();
164                *entry.get_mut().get_mut(pos).unwrap() = index_ref;
165            }
166            Vacant(_entry) => {
167                unreachable!()
168            }
169        };
170    }
171
172    pub fn drop_table(&mut self, id: TableId) {
173        if let Some(table_ref) = self.table_by_id.remove(&id) {
174            self.table_by_name.remove(&table_ref.name).unwrap();
175            self.indexes_by_table_id.remove(&table_ref.id);
176        } else {
177            tracing::warn!(
178                id = ?id.table_id,
179                "table not found when dropping, frontend might not be notified yet"
180            );
181        }
182    }
183
184    pub fn create_index(&mut self, prost: &PbIndex) {
185        let name = prost.name.clone();
186        let id = prost.id.into();
187        let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap();
188        let primary_table = self
189            .get_created_table_by_id(&prost.primary_table_id.into())
190            .unwrap();
191        let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
192        let index_ref = Arc::new(index);
193
194        self.index_by_name
195            .try_insert(name, index_ref.clone())
196            .unwrap();
197        self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
198        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
199            Occupied(mut entry) => {
200                entry.get_mut().push(index_ref);
201            }
202            Vacant(entry) => {
203                entry.insert(vec![index_ref]);
204            }
205        };
206    }
207
208    pub fn drop_index(&mut self, id: IndexId) {
209        let index_ref = self.index_by_id.remove(&id).unwrap();
210        self.index_by_name.remove(&index_ref.name).unwrap();
211        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
212            Occupied(mut entry) => {
213                let pos = entry
214                    .get_mut()
215                    .iter()
216                    .position(|x| x.id == index_ref.id)
217                    .unwrap();
218                entry.get_mut().remove(pos);
219            }
220            Vacant(_entry) => (),
221        };
222    }
223
224    pub fn create_source(&mut self, prost: &PbSource) {
225        let name = prost.name.clone();
226        let id = prost.id;
227        let source = SourceCatalog::from(prost);
228        let source_ref = Arc::new(source);
229
230        if let Some(connection_id) = source_ref.connection_id {
231            self.connection_source_ref
232                .entry(connection_id)
233                .and_modify(|sources| sources.push(source_ref.id))
234                .or_insert(vec![source_ref.id]);
235        }
236
237        self.source_by_name
238            .try_insert(name, source_ref.clone())
239            .unwrap();
240        self.source_by_id.try_insert(id, source_ref).unwrap();
241    }
242
243    pub fn drop_source(&mut self, id: SourceId) {
244        let source_ref = self.source_by_id.remove(&id).unwrap();
245        self.source_by_name.remove(&source_ref.name).unwrap();
246        if let Some(connection_id) = source_ref.connection_id
247            && let Occupied(mut e) = self.connection_source_ref.entry(connection_id)
248        {
249            let source_ids = e.get_mut();
250            source_ids.retain_mut(|sid| *sid != id);
251            if source_ids.is_empty() {
252                e.remove_entry();
253            }
254        }
255    }
256
257    pub fn update_source(&mut self, prost: &PbSource) {
258        let name = prost.name.clone();
259        let id = prost.id;
260        let source = SourceCatalog::from(prost);
261        let source_ref = Arc::new(source);
262
263        let old_source = self.source_by_id.get(&id).unwrap();
264        // check if the source name gets updated.
265        if old_source.name != name
266            && let Some(src) = self.source_by_name.get(&old_source.name)
267            && src.id == id
268        {
269            self.source_by_name.remove(&old_source.name);
270        }
271
272        self.source_by_name.insert(name, source_ref.clone());
273        self.source_by_id.insert(id, source_ref);
274    }
275
276    pub fn create_sink(&mut self, prost: &PbSink) {
277        let name = prost.name.clone();
278        let id = prost.id;
279        let sink = SinkCatalog::from(prost);
280        let sink_ref = Arc::new(sink);
281
282        if let Some(connection_id) = sink_ref.connection_id {
283            self.connection_sink_ref
284                .entry(connection_id.0)
285                .and_modify(|sinks| sinks.push(id))
286                .or_insert(vec![id]);
287        }
288
289        self.sink_by_name
290            .try_insert(name, sink_ref.clone())
291            .unwrap();
292        self.sink_by_id.try_insert(id, sink_ref).unwrap();
293    }
294
295    pub fn drop_sink(&mut self, id: SinkId) {
296        if let Some(sink_ref) = self.sink_by_id.remove(&id) {
297            self.sink_by_name.remove(&sink_ref.name).unwrap();
298            if let Some(connection_id) = sink_ref.connection_id
299                && let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0)
300            {
301                let sink_ids = e.get_mut();
302                sink_ids.retain_mut(|sid| *sid != id);
303                if sink_ids.is_empty() {
304                    e.remove_entry();
305                }
306            }
307        } else {
308            tracing::warn!(
309                id,
310                "sink not found when dropping, frontend might not be notified yet"
311            );
312        }
313    }
314
315    pub fn update_sink(&mut self, prost: &PbSink) {
316        let name = prost.name.clone();
317        let id = prost.id;
318        let sink = SinkCatalog::from(prost);
319        let sink_ref = Arc::new(sink);
320
321        let old_sink = self.sink_by_id.get(&id).unwrap();
322        // check if the sink name gets updated.
323        if old_sink.name != name
324            && let Some(s) = self.sink_by_name.get(&old_sink.name)
325            && s.id.sink_id == id
326        {
327            self.sink_by_name.remove(&old_sink.name);
328        }
329
330        self.sink_by_name.insert(name, sink_ref.clone());
331        self.sink_by_id.insert(id, sink_ref);
332    }
333
334    pub fn create_subscription(&mut self, prost: &PbSubscription) {
335        let name = prost.name.clone();
336        let id = prost.id;
337        let subscription_catalog = SubscriptionCatalog::from(prost);
338        let subscription_ref = Arc::new(subscription_catalog);
339
340        self.subscription_by_name
341            .try_insert(name, subscription_ref.clone())
342            .unwrap();
343        self.subscription_by_id
344            .try_insert(id, subscription_ref)
345            .unwrap();
346    }
347
348    pub fn drop_subscription(&mut self, id: SubscriptionId) {
349        let subscription_ref = self.subscription_by_id.remove(&id);
350        if let Some(subscription_ref) = subscription_ref {
351            self.subscription_by_name.remove(&subscription_ref.name);
352        }
353    }
354
355    pub fn update_subscription(&mut self, prost: &PbSubscription) {
356        let name = prost.name.clone();
357        let id = prost.id;
358        let subscription = SubscriptionCatalog::from(prost);
359        let subscription_ref = Arc::new(subscription);
360
361        let old_subscription = self.subscription_by_id.get(&id).unwrap();
362        // check if the subscription name gets updated.
363        if old_subscription.name != name
364            && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
365            && s.id.subscription_id == id
366        {
367            self.subscription_by_name.remove(&old_subscription.name);
368        }
369
370        self.subscription_by_name
371            .insert(name, subscription_ref.clone());
372        self.subscription_by_id.insert(id, subscription_ref);
373    }
374
375    pub fn create_view(&mut self, prost: &PbView) {
376        let name = prost.name.clone();
377        let id = prost.id;
378        let view = ViewCatalog::from(prost);
379        let view_ref = Arc::new(view);
380
381        self.view_by_name
382            .try_insert(name, view_ref.clone())
383            .unwrap();
384        self.view_by_id.try_insert(id, view_ref).unwrap();
385    }
386
387    pub fn drop_view(&mut self, id: ViewId) {
388        let view_ref = self.view_by_id.remove(&id).unwrap();
389        self.view_by_name.remove(&view_ref.name).unwrap();
390    }
391
392    pub fn update_view(&mut self, prost: &PbView) {
393        let name = prost.name.clone();
394        let id = prost.id;
395        let view = ViewCatalog::from(prost);
396        let view_ref = Arc::new(view);
397
398        let old_view = self.view_by_id.get(&id).unwrap();
399        // check if the view name gets updated.
400        if old_view.name != name
401            && let Some(v) = self.view_by_name.get(old_view.name())
402            && v.id == id
403        {
404            self.view_by_name.remove(&old_view.name);
405        }
406
407        self.view_by_name.insert(name, view_ref.clone());
408        self.view_by_id.insert(id, view_ref);
409    }
410
411    pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
412        FuncSign {
413            name: FuncName::Udf(func.name.clone()),
414            inputs_type: func
415                .arg_types
416                .iter()
417                .map(|t| t.clone().into())
418                .collect_vec(),
419            variadic: false,
420            ret_type: func.return_type.clone().into(),
421            build: FuncBuilder::Udf,
422            // dummy type infer, will not use this result
423            type_infer: |_| Ok(DataType::Boolean),
424            deprecated: false,
425        }
426    }
427
428    pub fn create_function(&mut self, prost: &PbFunction) {
429        let name = prost.name.clone();
430        let id = prost.id;
431        let function = FunctionCatalog::from(prost);
432        let args = function.arg_types.clone();
433        let function_ref = Arc::new(function);
434
435        self.function_registry
436            .insert(Self::get_func_sign(&function_ref));
437        self.function_by_name
438            .entry(name)
439            .or_default()
440            .try_insert(args, function_ref.clone())
441            .expect("function already exists with same argument types");
442        self.function_by_id
443            .try_insert(id.into(), function_ref)
444            .expect("function id exists");
445    }
446
447    pub fn drop_function(&mut self, id: FunctionId) {
448        let function_ref = self
449            .function_by_id
450            .remove(&id)
451            .expect("function not found by id");
452
453        self.function_registry
454            .remove(Self::get_func_sign(&function_ref))
455            .expect("function not found in registry");
456
457        self.function_by_name
458            .get_mut(&function_ref.name)
459            .expect("function not found by name")
460            .remove(&function_ref.arg_types)
461            .expect("function not found by argument types");
462    }
463
464    pub fn update_function(&mut self, prost: &PbFunction) {
465        let name = prost.name.clone();
466        let id = prost.id.into();
467        let function = FunctionCatalog::from(prost);
468        let function_ref = Arc::new(function);
469
470        let old_function_by_id = self.function_by_id.get(&id).unwrap();
471        let old_function_by_name = self
472            .function_by_name
473            .get_mut(&old_function_by_id.name)
474            .unwrap();
475        // check if the function name gets updated.
476        if old_function_by_id.name != name
477            && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
478            && f.id == id
479        {
480            old_function_by_name.remove(&old_function_by_id.arg_types);
481            if old_function_by_name.is_empty() {
482                self.function_by_name.remove(&old_function_by_id.name);
483            }
484        }
485
486        self.function_by_name
487            .entry(name)
488            .or_default()
489            .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
490        self.function_by_id.insert(id, function_ref);
491    }
492
493    pub fn create_connection(&mut self, prost: &PbConnection) {
494        let name = prost.name.clone();
495        let id = prost.id;
496        let connection = ConnectionCatalog::from(prost);
497        let connection_ref = Arc::new(connection);
498        self.connection_by_name
499            .try_insert(name, connection_ref.clone())
500            .unwrap();
501        self.connection_by_id
502            .try_insert(id, connection_ref)
503            .unwrap();
504    }
505
506    pub fn update_connection(&mut self, prost: &PbConnection) {
507        let name = prost.name.clone();
508        let id = prost.id;
509        let connection = ConnectionCatalog::from(prost);
510        let connection_ref = Arc::new(connection);
511
512        let old_connection = self.connection_by_id.get(&id).unwrap();
513        // check if the connection name gets updated.
514        if old_connection.name != name
515            && let Some(conn) = self.connection_by_name.get(&old_connection.name)
516            && conn.id == id
517        {
518            self.connection_by_name.remove(&old_connection.name);
519        }
520
521        self.connection_by_name.insert(name, connection_ref.clone());
522        self.connection_by_id.insert(id, connection_ref);
523    }
524
525    pub fn drop_connection(&mut self, connection_id: ConnectionId) {
526        let connection_ref = self
527            .connection_by_id
528            .remove(&connection_id)
529            .expect("connection not found by id");
530        self.connection_by_name
531            .remove(&connection_ref.name)
532            .expect("connection not found by name");
533    }
534
535    pub fn create_secret(&mut self, prost: &PbSecret) {
536        let name = prost.name.clone();
537        let id = SecretId::new(prost.id);
538        let secret = SecretCatalog::from(prost);
539        let secret_ref = Arc::new(secret);
540
541        self.secret_by_id
542            .try_insert(id, secret_ref.clone())
543            .unwrap();
544        self.secret_by_name
545            .try_insert(name, secret_ref.clone())
546            .unwrap();
547    }
548
549    pub fn update_secret(&mut self, prost: &PbSecret) {
550        let name = prost.name.clone();
551        let id = SecretId::new(prost.id);
552        let secret = SecretCatalog::from(prost);
553        let secret_ref = Arc::new(secret);
554
555        let old_secret = self.secret_by_id.get(&id).unwrap();
556        // check if the secret name gets updated.
557        if old_secret.name != name
558            && let Some(s) = self.secret_by_name.get(&old_secret.name)
559            && s.id == id
560        {
561            self.secret_by_name.remove(&old_secret.name);
562        }
563
564        self.secret_by_name.insert(name, secret_ref.clone());
565        self.secret_by_id.insert(id, secret_ref);
566    }
567
568    pub fn drop_secret(&mut self, secret_id: SecretId) {
569        let secret_ref = self
570            .secret_by_id
571            .remove(&secret_id)
572            .expect("secret not found by id");
573        self.secret_by_name
574            .remove(&secret_ref.name)
575            .expect("secret not found by name");
576    }
577
578    pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
579        self.table_by_name.values()
580    }
581
582    pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
583        self.table_by_name.values().filter(|v| v.is_user_table())
584    }
585
586    pub fn iter_user_table_with_acl<'a>(
587        &'a self,
588        user: &'a UserCatalog,
589    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
590        self.table_by_name.values().filter(|v| {
591            v.is_user_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
592        })
593    }
594
595    pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
596        self.table_by_name
597            .values()
598            .filter(|v| v.is_internal_table())
599    }
600
601    pub fn iter_internal_table_with_acl<'a>(
602        &'a self,
603        user: &'a UserCatalog,
604    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
605        self.table_by_name.values().filter(|v| {
606            v.is_internal_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
607        })
608    }
609
610    /// Iterate all non-internal tables, including user tables, materialized views and indices.
611    pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
612        self.table_by_name
613            .values()
614            .filter(|v| !v.is_internal_table())
615    }
616
617    pub fn iter_table_mv_indices_with_acl<'a>(
618        &'a self,
619        user: &'a UserCatalog,
620    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
621        self.table_by_name.values().filter(|v| {
622            !v.is_internal_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
623        })
624    }
625
626    /// Iterate all materialized views, excluding the indices.
627    pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
628        self.table_by_name.values().filter(|v| v.is_mview())
629    }
630
631    pub fn iter_all_mvs_with_acl<'a>(
632        &'a self,
633        user: &'a UserCatalog,
634    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
635        self.table_by_name.values().filter(|v| {
636            v.is_mview() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
637        })
638    }
639
640    /// Iterate created materialized views, excluding the indices.
641    pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
642        self.table_by_name
643            .values()
644            .filter(|v| v.is_mview() && v.is_created())
645    }
646
647    pub fn iter_created_mvs_with_acl<'a>(
648        &'a self,
649        user: &'a UserCatalog,
650    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
651        self.table_by_name.values().filter(|v| {
652            v.is_mview()
653                && v.is_created()
654                && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
655        })
656    }
657
658    /// Iterate all indices
659    pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
660        self.index_by_name.values()
661    }
662
663    pub fn iter_index_with_acl<'a>(
664        &'a self,
665        user: &'a UserCatalog,
666    ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
667        self.index_by_name
668            .values()
669            .filter(|idx| has_access_to_object(user, &self.name, idx.id.index_id, idx.owner()))
670    }
671
672    /// Iterate all sources
673    pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
674        self.source_by_name.values()
675    }
676
677    pub fn iter_source_with_acl<'a>(
678        &'a self,
679        user: &'a UserCatalog,
680    ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
681        self.source_by_name
682            .values()
683            .filter(|s| has_access_to_object(user, &self.name, s.id, s.owner))
684    }
685
686    pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
687        self.sink_by_name.values()
688    }
689
690    pub fn iter_sink_with_acl<'a>(
691        &'a self,
692        user: &'a UserCatalog,
693    ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
694        self.sink_by_name
695            .values()
696            .filter(|s| has_access_to_object(user, &self.name, s.id.sink_id, s.owner.user_id))
697    }
698
699    pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
700        self.subscription_by_name.values()
701    }
702
703    pub fn iter_subscription_with_acl<'a>(
704        &'a self,
705        user: &'a UserCatalog,
706    ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
707        self.subscription_by_name.values().filter(|s| {
708            has_access_to_object(user, &self.name, s.id.subscription_id, s.owner.user_id)
709        })
710    }
711
712    pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
713        self.view_by_name.values()
714    }
715
716    pub fn iter_view_with_acl<'a>(
717        &'a self,
718        user: &'a UserCatalog,
719    ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
720        self.view_by_name
721            .values()
722            .filter(|v| v.is_system_view() || has_access_to_object(user, &self.name, v.id, v.owner))
723    }
724
725    pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
726        self.function_by_name.values().flat_map(|v| v.values())
727    }
728
729    pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
730        self.connection_by_name.values()
731    }
732
733    pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
734        self.secret_by_name.values()
735    }
736
737    pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
738        self.system_table_by_name.values()
739    }
740
741    pub fn get_table_by_name(
742        &self,
743        table_name: &str,
744        bind_creating_relations: bool,
745    ) -> Option<&Arc<TableCatalog>> {
746        self.table_by_name
747            .get(table_name)
748            .filter(|&table| bind_creating_relations || table.is_created())
749    }
750
751    pub fn get_any_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
752        self.get_table_by_name(table_name, true)
753    }
754
755    pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
756        self.get_table_by_name(table_name, false)
757    }
758
759    pub fn get_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
760        self.table_by_id.get(table_id)
761    }
762
763    pub fn get_created_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
764        self.table_by_id
765            .get(table_id)
766            .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
767    }
768
769    pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
770        self.view_by_name.get(view_name)
771    }
772
773    pub fn get_view_by_id(&self, view_id: &ViewId) -> Option<&Arc<ViewCatalog>> {
774        self.view_by_id.get(view_id)
775    }
776
777    pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
778        self.source_by_name.get(source_name)
779    }
780
781    pub fn get_source_by_id(&self, source_id: &SourceId) -> Option<&Arc<SourceCatalog>> {
782        self.source_by_id.get(source_id)
783    }
784
785    pub fn get_sink_by_name(
786        &self,
787        sink_name: &str,
788        bind_creating: bool,
789    ) -> Option<&Arc<SinkCatalog>> {
790        self.sink_by_name
791            .get(sink_name)
792            .filter(|s| bind_creating || s.is_created())
793    }
794
795    pub fn get_any_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
796        self.get_sink_by_name(sink_name, true)
797    }
798
799    pub fn get_created_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
800        self.get_sink_by_name(sink_name, false)
801    }
802
803    pub fn get_sink_by_id(&self, sink_id: &SinkId) -> Option<&Arc<SinkCatalog>> {
804        self.sink_by_id.get(sink_id)
805    }
806
807    pub fn get_subscription_by_name(
808        &self,
809        subscription_name: &str,
810    ) -> Option<&Arc<SubscriptionCatalog>> {
811        self.subscription_by_name.get(subscription_name)
812    }
813
814    pub fn get_subscription_by_id(
815        &self,
816        subscription_id: &SubscriptionId,
817    ) -> Option<&Arc<SubscriptionCatalog>> {
818        self.subscription_by_id.get(subscription_id)
819    }
820
821    pub fn get_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
822        self.index_by_name.get(index_name)
823    }
824
825    pub fn get_index_by_id(&self, index_id: &IndexId) -> Option<&Arc<IndexCatalog>> {
826        self.index_by_id.get(index_id)
827    }
828
829    /// Returns all indexes on the given table. Will not check if the table exists.
830    pub fn get_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
831        self.indexes_by_table_id
832            .get(table_id)
833            .cloned()
834            .unwrap_or_default()
835    }
836
837    pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
838        self.system_table_by_name.get(table_name)
839    }
840
841    pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
842        self.table_by_id
843            .get(&table_id)
844            .map(|table| table.name.clone())
845    }
846
847    pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
848        self.function_by_id.get(&function_id)
849    }
850
851    pub fn get_function_by_name_inputs(
852        &self,
853        name: &str,
854        inputs: &mut [ExprImpl],
855    ) -> Option<&Arc<FunctionCatalog>> {
856        infer_type_with_sigmap(
857            FuncName::Udf(name.to_owned()),
858            inputs,
859            &self.function_registry,
860        )
861        .ok()?;
862        let args = inputs.iter().map(|x| x.return_type()).collect_vec();
863        self.function_by_name.get(name)?.get(&args)
864    }
865
866    pub fn get_function_by_name_args(
867        &self,
868        name: &str,
869        args: &[DataType],
870    ) -> Option<&Arc<FunctionCatalog>> {
871        let args = args.iter().map(|x| Some(x.clone())).collect_vec();
872        let func = infer_type_name(
873            &self.function_registry,
874            FuncName::Udf(name.to_owned()),
875            &args,
876        )
877        .ok()?;
878
879        let args = func
880            .inputs_type
881            .iter()
882            .filter_map(|x| {
883                if let SigDataType::Exact(t) = x {
884                    Some(t.clone())
885                } else {
886                    None
887                }
888            })
889            .collect_vec();
890
891        self.function_by_name.get(name)?.get(&args)
892    }
893
894    pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
895        let functions = self.function_by_name.get(name)?;
896        if functions.is_empty() {
897            return None;
898        }
899        Some(functions.values().collect())
900    }
901
902    pub fn get_connection_by_id(
903        &self,
904        connection_id: &ConnectionId,
905    ) -> Option<&Arc<ConnectionCatalog>> {
906        self.connection_by_id.get(connection_id)
907    }
908
909    pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
910        self.connection_by_name.get(connection_name)
911    }
912
913    pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
914        self.secret_by_name.get(secret_name)
915    }
916
917    pub fn get_secret_by_id(&self, secret_id: &SecretId) -> Option<&Arc<SecretCatalog>> {
918        self.secret_by_id.get(secret_id)
919    }
920
921    /// get all sources referencing the connection
922    pub fn get_source_ids_by_connection(
923        &self,
924        connection_id: ConnectionId,
925    ) -> Option<Vec<SourceId>> {
926        self.connection_source_ref
927            .get(&connection_id)
928            .map(|c| c.to_owned())
929    }
930
931    /// get all sinks referencing the connection
932    pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
933        self.connection_sink_ref
934            .get(&connection_id)
935            .map(|s| s.to_owned())
936    }
937
938    pub fn get_grant_object_by_oid(&self, oid: u32) -> Option<OwnedGrantObject> {
939        #[allow(clippy::manual_map)]
940        if let Some(table) = self.get_created_table_by_id(&TableId::new(oid)) {
941            Some(OwnedGrantObject {
942                owner: table.owner,
943                object: Object::TableId(oid),
944            })
945        } else if let Some(index) = self.get_index_by_id(&IndexId::new(oid)) {
946            Some(OwnedGrantObject {
947                owner: index.owner(),
948                object: Object::TableId(oid),
949            })
950        } else if let Some(source) = self.get_source_by_id(&oid) {
951            Some(OwnedGrantObject {
952                owner: source.owner,
953                object: Object::SourceId(oid),
954            })
955        } else if let Some(sink) = self.get_sink_by_id(&oid) {
956            Some(OwnedGrantObject {
957                owner: sink.owner.user_id,
958                object: Object::SinkId(oid),
959            })
960        } else if let Some(view) = self.get_view_by_id(&oid) {
961            Some(OwnedGrantObject {
962                owner: view.owner,
963                object: Object::ViewId(oid),
964            })
965        } else if let Some(function) = self.get_function_by_id(FunctionId::new(oid)) {
966            Some(OwnedGrantObject {
967                owner: function.owner(),
968                object: Object::FunctionId(oid),
969            })
970        } else if let Some(subscription) = self.get_subscription_by_id(&oid) {
971            Some(OwnedGrantObject {
972                owner: subscription.owner.user_id,
973                object: Object::SubscriptionId(oid),
974            })
975        } else if let Some(connection) = self.get_connection_by_id(&oid) {
976            Some(OwnedGrantObject {
977                owner: connection.owner,
978                object: Object::ConnectionId(oid),
979            })
980        } else if let Some(secret) = self.get_secret_by_id(&SecretId::new(oid)) {
981            Some(OwnedGrantObject {
982                owner: secret.owner,
983                object: Object::SecretId(oid),
984            })
985        } else {
986            None
987        }
988    }
989
990    pub fn contains_object(&self, oid: u32) -> bool {
991        self.table_by_id.contains_key(&TableId::new(oid))
992            || self.index_by_id.contains_key(&IndexId::new(oid))
993            || self.source_by_id.contains_key(&oid)
994            || self.sink_by_id.contains_key(&oid)
995            || self.view_by_id.contains_key(&oid)
996            || self.function_by_id.contains_key(&FunctionId::new(oid))
997            || self.subscription_by_id.contains_key(&oid)
998            || self.connection_by_id.contains_key(&oid)
999    }
1000
1001    pub fn id(&self) -> SchemaId {
1002        self.id
1003    }
1004
1005    pub fn database_id(&self) -> DatabaseId {
1006        self.database_id
1007    }
1008
1009    pub fn name(&self) -> String {
1010        self.name.clone()
1011    }
1012}
1013
1014impl OwnedByUserCatalog for SchemaCatalog {
1015    fn owner(&self) -> UserId {
1016        self.owner
1017    }
1018}
1019
1020impl From<&PbSchema> for SchemaCatalog {
1021    fn from(schema: &PbSchema) -> Self {
1022        Self {
1023            id: schema.id,
1024            owner: schema.owner,
1025            name: schema.name.clone(),
1026            database_id: schema.database_id,
1027            table_by_name: HashMap::new(),
1028            table_by_id: HashMap::new(),
1029            source_by_name: HashMap::new(),
1030            source_by_id: HashMap::new(),
1031            sink_by_name: HashMap::new(),
1032            sink_by_id: HashMap::new(),
1033            index_by_name: HashMap::new(),
1034            index_by_id: HashMap::new(),
1035            indexes_by_table_id: HashMap::new(),
1036            system_table_by_name: HashMap::new(),
1037            view_by_name: HashMap::new(),
1038            view_by_id: HashMap::new(),
1039            function_registry: FunctionRegistry::default(),
1040            function_by_name: HashMap::new(),
1041            function_by_id: HashMap::new(),
1042            connection_by_name: HashMap::new(),
1043            connection_by_id: HashMap::new(),
1044            secret_by_name: HashMap::new(),
1045            secret_by_id: HashMap::new(),
1046            _secret_source_ref: HashMap::new(),
1047            _secret_sink_ref: HashMap::new(),
1048            connection_source_ref: HashMap::new(),
1049            connection_sink_ref: HashMap::new(),
1050            subscription_by_name: HashMap::new(),
1051            subscription_by_id: HashMap::new(),
1052        }
1053    }
1054}