risingwave_frontend/catalog/
schema_catalog.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry::{Occupied, Vacant};
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25    PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26    PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, OwnedGrantObject, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47    id: SchemaId,
48    pub name: String,
49    pub database_id: DatabaseId,
50    /// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
51    table_by_name: HashMap<String, Arc<TableCatalog>>,
52    /// Contains [all types of "tables"](super::table_catalog::TableType), not only user tables.
53    table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54    source_by_name: HashMap<String, Arc<SourceCatalog>>,
55    source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56    sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57    sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58    /// reverted index of (`sink.target_table` -> `sink_id`s)
59    table_incoming_sinks: HashMap<TableId, HashSet<SinkId>>,
60    subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
61    subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
62    index_by_name: HashMap<String, Arc<IndexCatalog>>,
63    index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
64    indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
65    view_by_name: HashMap<String, Arc<ViewCatalog>>,
66    view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
67    function_registry: FunctionRegistry,
68    function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
69    function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
70    connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
71    connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
72    secret_by_name: HashMap<String, Arc<SecretCatalog>>,
73    secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
74
75    _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
76    _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
77
78    // This field is currently used only for `show connections`
79    connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
80    // This field is currently used only for `show connections`
81    connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
82    // This field only available when schema is "pg_catalog". Meanwhile, others will be empty.
83    system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
84    pub owner: UserId,
85}
86
87impl SchemaCatalog {
88    pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
89        let name = prost.name.clone();
90        let id = prost.id;
91        let table: TableCatalog = prost.into();
92        let table_ref = Arc::new(table);
93
94        self.table_by_name
95            .try_insert(name, table_ref.clone())
96            .unwrap();
97        self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
98        table_ref
99    }
100
101    pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
102        self.system_table_by_name
103            .try_insert(sys_table.name.clone(), sys_table)
104            .unwrap();
105    }
106
107    pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
108        self.view_by_name
109            .try_insert(sys_view.name().to_owned(), sys_view.clone())
110            .unwrap();
111        self.view_by_id
112            .try_insert(sys_view.id, sys_view.clone())
113            .unwrap();
114    }
115
116    pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
117        let name = prost.name.clone();
118        let id = prost.id;
119        let table: TableCatalog = prost.into();
120        let table_ref = Arc::new(table);
121
122        let old_table = self.table_by_id.get(&id).unwrap();
123        // check if the table name gets updated.
124        if old_table.name() != name
125            && let Some(t) = self.table_by_name.get(old_table.name())
126            && t.id == id
127        {
128            self.table_by_name.remove(old_table.name());
129        }
130
131        self.table_by_name.insert(name, table_ref.clone());
132        self.table_by_id.insert(id, table_ref.clone());
133        table_ref
134    }
135
136    pub fn update_index(&mut self, prost: &PbIndex) {
137        let name = prost.name.clone();
138        let id = prost.id;
139        let old_index = self.index_by_id.get(&id).unwrap();
140        let index_table = self.get_created_table_by_id(prost.index_table_id).unwrap();
141        let primary_table = self
142            .get_created_table_by_id(prost.primary_table_id)
143            .unwrap();
144        let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
145        let index_ref = Arc::new(index);
146
147        // check if the index name gets updated.
148        if old_index.name != name
149            && let Some(idx) = self.index_by_name.get(&old_index.name)
150            && idx.id == id
151        {
152            self.index_by_name.remove(&old_index.name);
153        }
154        self.index_by_name.insert(name, index_ref.clone());
155        self.index_by_id.insert(id, index_ref.clone());
156
157        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
158            Occupied(mut entry) => {
159                let pos = entry
160                    .get()
161                    .iter()
162                    .position(|x| x.id == index_ref.id)
163                    .unwrap();
164                *entry.get_mut().get_mut(pos).unwrap() = index_ref;
165            }
166            Vacant(_entry) => {
167                unreachable!()
168            }
169        };
170    }
171
172    pub fn drop_table(&mut self, id: TableId) {
173        if let Some(table_ref) = self.table_by_id.remove(&id) {
174            self.table_by_name.remove(&table_ref.name).unwrap();
175            self.indexes_by_table_id.remove(&table_ref.id);
176        } else {
177            tracing::warn!(
178                %id,
179                "table not found when dropping, frontend might not be notified yet"
180            );
181        }
182    }
183
184    pub fn create_index(&mut self, prost: &PbIndex) {
185        let name = prost.name.clone();
186        let id = prost.id;
187        let index_table = self.get_table_by_id(prost.index_table_id).unwrap();
188        let primary_table = self
189            .get_created_table_by_id(prost.primary_table_id)
190            .unwrap();
191        let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
192        let index_ref = Arc::new(index);
193
194        self.index_by_name
195            .try_insert(name, index_ref.clone())
196            .unwrap();
197        self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
198        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
199            Occupied(mut entry) => {
200                entry.get_mut().push(index_ref);
201            }
202            Vacant(entry) => {
203                entry.insert(vec![index_ref]);
204            }
205        };
206    }
207
208    pub fn drop_index(&mut self, id: IndexId) {
209        let index_ref = self.index_by_id.remove(&id).unwrap();
210        self.index_by_name.remove(&index_ref.name).unwrap();
211        match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
212            Occupied(mut entry) => {
213                let pos = entry
214                    .get_mut()
215                    .iter()
216                    .position(|x| x.id == index_ref.id)
217                    .unwrap();
218                entry.get_mut().remove(pos);
219            }
220            Vacant(_entry) => (),
221        };
222    }
223
224    pub fn create_source(&mut self, prost: &PbSource) {
225        let name = prost.name.clone();
226        let id = prost.id;
227        let source = SourceCatalog::from(prost);
228        let source_ref = Arc::new(source);
229
230        if let Some(connection_id) = source_ref.connection_id {
231            self.connection_source_ref
232                .entry(connection_id)
233                .and_modify(|sources| sources.push(source_ref.id))
234                .or_insert(vec![source_ref.id]);
235        }
236
237        self.source_by_name
238            .try_insert(name, source_ref.clone())
239            .unwrap();
240        self.source_by_id.try_insert(id, source_ref).unwrap();
241    }
242
243    pub fn drop_source(&mut self, id: SourceId) {
244        let source_ref = self.source_by_id.remove(&id).unwrap();
245        self.source_by_name.remove(&source_ref.name).unwrap();
246        if let Some(connection_id) = source_ref.connection_id
247            && let Occupied(mut e) = self.connection_source_ref.entry(connection_id)
248        {
249            let source_ids = e.get_mut();
250            source_ids.retain_mut(|sid| *sid != id);
251            if source_ids.is_empty() {
252                e.remove_entry();
253            }
254        }
255    }
256
257    pub fn update_source(&mut self, prost: &PbSource) {
258        let name = prost.name.clone();
259        let id = prost.id;
260        let source = SourceCatalog::from(prost);
261        let source_ref = Arc::new(source);
262
263        let old_source = self.source_by_id.get(&id).unwrap();
264        // check if the source name gets updated.
265        if old_source.name != name
266            && let Some(src) = self.source_by_name.get(&old_source.name)
267            && src.id == id
268        {
269            self.source_by_name.remove(&old_source.name);
270        }
271
272        self.source_by_name.insert(name, source_ref.clone());
273        self.source_by_id.insert(id, source_ref);
274    }
275
276    pub fn create_sink(&mut self, prost: &PbSink) {
277        let name = prost.name.clone();
278        let id = prost.id;
279        let sink = SinkCatalog::from(prost);
280        let sink_ref = Arc::new(sink);
281
282        if let Some(connection_id) = sink_ref.connection_id {
283            self.connection_sink_ref
284                .entry(connection_id)
285                .and_modify(|sinks| sinks.push(id))
286                .or_insert(vec![id]);
287        }
288
289        if let Some(target_table) = sink_ref.target_table {
290            assert!(
291                self.table_incoming_sinks
292                    .entry(target_table)
293                    .or_default()
294                    .insert(sink_ref.id)
295            );
296        }
297
298        self.sink_by_name
299            .try_insert(name, sink_ref.clone())
300            .unwrap();
301        self.sink_by_id.try_insert(id, sink_ref).unwrap();
302    }
303
304    pub fn drop_sink(&mut self, id: SinkId) {
305        if let Some(sink_ref) = self.sink_by_id.remove(&id) {
306            self.sink_by_name.remove(&sink_ref.name).unwrap();
307            if let Some(connection_id) = sink_ref.connection_id
308                && let Occupied(mut e) = self.connection_sink_ref.entry(connection_id)
309            {
310                let sink_ids = e.get_mut();
311                sink_ids.retain_mut(|sid| *sid != id);
312                if sink_ids.is_empty() {
313                    e.remove_entry();
314                }
315            }
316            if let Some(target_table) = sink_ref.target_table {
317                let incoming_sinks = self
318                    .table_incoming_sinks
319                    .get_mut(&target_table)
320                    .expect("should exists");
321                assert!(incoming_sinks.remove(&sink_ref.id));
322                if incoming_sinks.is_empty() {
323                    self.table_incoming_sinks.remove(&target_table);
324                }
325            }
326        } else {
327            tracing::warn!(
328                %id,
329                "sink not found when dropping, frontend might not be notified yet"
330            );
331        }
332    }
333
334    pub fn update_sink(&mut self, prost: &PbSink) {
335        let name = prost.name.clone();
336        let id = prost.id;
337        let sink = SinkCatalog::from(prost);
338        let sink_ref = Arc::new(sink);
339
340        let old_sink = self.sink_by_id.get(&id).unwrap();
341        assert_eq!(sink_ref.target_table, old_sink.target_table);
342        // check if the sink name gets updated.
343        if old_sink.name != name
344            && let Some(s) = self.sink_by_name.get(&old_sink.name)
345            && s.id == id
346        {
347            self.sink_by_name.remove(&old_sink.name);
348        }
349
350        self.sink_by_name.insert(name, sink_ref.clone());
351        self.sink_by_id.insert(id, sink_ref);
352    }
353
354    pub fn table_incoming_sinks(&self, table_id: TableId) -> Option<&HashSet<SinkId>> {
355        self.table_incoming_sinks.get(&table_id)
356    }
357
358    pub fn create_subscription(&mut self, prost: &PbSubscription) {
359        let name = prost.name.clone();
360        let id = prost.id;
361        let subscription_catalog = SubscriptionCatalog::from(prost);
362        let subscription_ref = Arc::new(subscription_catalog);
363
364        self.subscription_by_name
365            .try_insert(name, subscription_ref.clone())
366            .unwrap();
367        self.subscription_by_id
368            .try_insert(id, subscription_ref)
369            .unwrap();
370    }
371
372    pub fn drop_subscription(&mut self, id: SubscriptionId) {
373        let subscription_ref = self.subscription_by_id.remove(&id);
374        if let Some(subscription_ref) = subscription_ref {
375            self.subscription_by_name.remove(&subscription_ref.name);
376        }
377    }
378
379    pub fn update_subscription(&mut self, prost: &PbSubscription) {
380        let name = prost.name.clone();
381        let id = prost.id;
382        let subscription = SubscriptionCatalog::from(prost);
383        let subscription_ref = Arc::new(subscription);
384
385        let old_subscription = self.subscription_by_id.get(&id).unwrap();
386        // check if the subscription name gets updated.
387        if old_subscription.name != name
388            && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
389            && s.id == id
390        {
391            self.subscription_by_name.remove(&old_subscription.name);
392        }
393
394        self.subscription_by_name
395            .insert(name, subscription_ref.clone());
396        self.subscription_by_id.insert(id, subscription_ref);
397    }
398
399    pub fn create_view(&mut self, prost: &PbView) {
400        let name = prost.name.clone();
401        let id = prost.id;
402        let view = ViewCatalog::from(prost);
403        let view_ref = Arc::new(view);
404
405        self.view_by_name
406            .try_insert(name, view_ref.clone())
407            .unwrap();
408        self.view_by_id.try_insert(id, view_ref).unwrap();
409    }
410
411    pub fn drop_view(&mut self, id: ViewId) {
412        let view_ref = self.view_by_id.remove(&id).unwrap();
413        self.view_by_name.remove(&view_ref.name).unwrap();
414    }
415
416    pub fn update_view(&mut self, prost: &PbView) {
417        let name = prost.name.clone();
418        let id = prost.id;
419        let view = ViewCatalog::from(prost);
420        let view_ref = Arc::new(view);
421
422        let old_view = self.view_by_id.get(&id).unwrap();
423        // check if the view name gets updated.
424        if old_view.name != name
425            && let Some(v) = self.view_by_name.get(old_view.name())
426            && v.id == id
427        {
428            self.view_by_name.remove(&old_view.name);
429        }
430
431        self.view_by_name.insert(name, view_ref.clone());
432        self.view_by_id.insert(id, view_ref);
433    }
434
435    pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
436        FuncSign {
437            name: FuncName::Udf(func.name.clone()),
438            inputs_type: func
439                .arg_types
440                .iter()
441                .map(|t| t.clone().into())
442                .collect_vec(),
443            variadic: false,
444            ret_type: func.return_type.clone().into(),
445            build: FuncBuilder::Udf,
446            // dummy type infer, will not use this result
447            type_infer: |_| Ok(DataType::Boolean),
448            deprecated: false,
449        }
450    }
451
452    pub fn create_function(&mut self, prost: &PbFunction) {
453        let name = prost.name.clone();
454        let id = prost.id;
455        let function = FunctionCatalog::from(prost);
456        let args = function.arg_types.clone();
457        let function_ref = Arc::new(function);
458
459        self.function_registry
460            .insert(Self::get_func_sign(&function_ref));
461        self.function_by_name
462            .entry(name)
463            .or_default()
464            .try_insert(args, function_ref.clone())
465            .expect("function already exists with same argument types");
466        self.function_by_id
467            .try_insert(id, function_ref)
468            .expect("function id exists");
469    }
470
471    pub fn drop_function(&mut self, id: FunctionId) {
472        let function_ref = self
473            .function_by_id
474            .remove(&id)
475            .expect("function not found by id");
476
477        self.function_registry
478            .remove(Self::get_func_sign(&function_ref))
479            .expect("function not found in registry");
480
481        self.function_by_name
482            .get_mut(&function_ref.name)
483            .expect("function not found by name")
484            .remove(&function_ref.arg_types)
485            .expect("function not found by argument types");
486    }
487
488    pub fn update_function(&mut self, prost: &PbFunction) {
489        let name = prost.name.clone();
490        let id = prost.id;
491        let function = FunctionCatalog::from(prost);
492        let function_ref = Arc::new(function);
493
494        let old_function_by_id = self.function_by_id.get(&id).unwrap();
495        let old_function_by_name = self
496            .function_by_name
497            .get_mut(&old_function_by_id.name)
498            .unwrap();
499        // check if the function name gets updated.
500        if old_function_by_id.name != name
501            && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
502            && f.id == id
503        {
504            old_function_by_name.remove(&old_function_by_id.arg_types);
505            if old_function_by_name.is_empty() {
506                self.function_by_name.remove(&old_function_by_id.name);
507            }
508        }
509
510        self.function_by_name
511            .entry(name)
512            .or_default()
513            .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
514        self.function_by_id.insert(id, function_ref);
515    }
516
517    pub fn create_connection(&mut self, prost: &PbConnection) {
518        let name = prost.name.clone();
519        let id = prost.id;
520        let connection = ConnectionCatalog::from(prost);
521        let connection_ref = Arc::new(connection);
522        self.connection_by_name
523            .try_insert(name, connection_ref.clone())
524            .unwrap();
525        self.connection_by_id
526            .try_insert(id, connection_ref)
527            .unwrap();
528    }
529
530    pub fn update_connection(&mut self, prost: &PbConnection) {
531        let name = prost.name.clone();
532        let id = prost.id;
533        let connection = ConnectionCatalog::from(prost);
534        let connection_ref = Arc::new(connection);
535
536        let old_connection = self.connection_by_id.get(&id).unwrap();
537        // check if the connection name gets updated.
538        if old_connection.name != name
539            && let Some(conn) = self.connection_by_name.get(&old_connection.name)
540            && conn.id == id
541        {
542            self.connection_by_name.remove(&old_connection.name);
543        }
544
545        self.connection_by_name.insert(name, connection_ref.clone());
546        self.connection_by_id.insert(id, connection_ref);
547    }
548
549    pub fn drop_connection(&mut self, connection_id: ConnectionId) {
550        let connection_ref = self
551            .connection_by_id
552            .remove(&connection_id)
553            .expect("connection not found by id");
554        self.connection_by_name
555            .remove(&connection_ref.name)
556            .expect("connection not found by name");
557    }
558
559    pub fn create_secret(&mut self, prost: &PbSecret) {
560        let name = prost.name.clone();
561        let id = prost.id;
562        let secret = SecretCatalog::from(prost);
563        let secret_ref = Arc::new(secret);
564
565        self.secret_by_id
566            .try_insert(id, secret_ref.clone())
567            .unwrap();
568        self.secret_by_name.try_insert(name, secret_ref).unwrap();
569    }
570
571    pub fn update_secret(&mut self, prost: &PbSecret) {
572        let name = prost.name.clone();
573        let id = prost.id;
574        let secret = SecretCatalog::from(prost);
575        let secret_ref = Arc::new(secret);
576
577        let old_secret = self.secret_by_id.get(&id).unwrap();
578        // check if the secret name gets updated.
579        if old_secret.name != name
580            && let Some(s) = self.secret_by_name.get(&old_secret.name)
581            && s.id == id
582        {
583            self.secret_by_name.remove(&old_secret.name);
584        }
585
586        self.secret_by_name.insert(name, secret_ref.clone());
587        self.secret_by_id.insert(id, secret_ref);
588    }
589
590    pub fn drop_secret(&mut self, secret_id: SecretId) {
591        let secret_ref = self
592            .secret_by_id
593            .remove(&secret_id)
594            .expect("secret not found by id");
595        self.secret_by_name
596            .remove(&secret_ref.name)
597            .expect("secret not found by name");
598    }
599
600    pub fn iter_object_ids(&self) -> impl Iterator<Item = ObjectId> + '_ {
601        self.table_by_id
602            .keys()
603            .map(|id| id.as_object_id())
604            .chain(self.source_by_id.keys().map(|id| id.as_object_id()))
605            .chain(self.sink_by_id.keys().map(|id| id.as_object_id()))
606            .chain(self.subscription_by_id.keys().map(|id| id.as_object_id()))
607            .chain(self.index_by_id.keys().map(|id| id.as_object_id()))
608            .chain(self.view_by_id.keys().map(|id| id.as_object_id()))
609            .chain(self.function_by_id.keys().map(|id| id.as_object_id()))
610            .chain(self.connection_by_id.keys().map(|id| id.as_object_id()))
611            .chain(self.secret_by_id.keys().map(|id| id.as_object_id()))
612    }
613
614    pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
615        self.table_by_name.values()
616    }
617
618    pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
619        self.table_by_name.values().filter(|v| v.is_user_table())
620    }
621
622    pub fn iter_user_table_with_acl<'a>(
623        &'a self,
624        user: &'a UserCatalog,
625    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
626        self.table_by_name
627            .values()
628            .filter(|v| v.is_user_table() && has_access_to_object(user, v.id, v.owner))
629    }
630
631    pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
632        self.table_by_name
633            .values()
634            .filter(|v| v.is_internal_table())
635    }
636
637    pub fn iter_internal_table_with_acl<'a>(
638        &'a self,
639        user: &'a UserCatalog,
640    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
641        self.table_by_name
642            .values()
643            .filter(|v| v.is_internal_table() && has_access_to_object(user, v.id, v.owner))
644    }
645
646    /// Iterate all non-internal tables, including user tables, materialized views and indices.
647    pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
648        self.table_by_name
649            .values()
650            .filter(|v| !v.is_internal_table())
651    }
652
653    pub fn iter_table_mv_indices_with_acl<'a>(
654        &'a self,
655        user: &'a UserCatalog,
656    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
657        self.table_by_name
658            .values()
659            .filter(|v| !v.is_internal_table() && has_access_to_object(user, v.id, v.owner))
660    }
661
662    /// Iterate all materialized views, excluding the indices.
663    pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
664        self.table_by_name.values().filter(|v| v.is_mview())
665    }
666
667    pub fn iter_all_mvs_with_acl<'a>(
668        &'a self,
669        user: &'a UserCatalog,
670    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
671        self.table_by_name
672            .values()
673            .filter(|v| v.is_mview() && has_access_to_object(user, v.id, v.owner))
674    }
675
676    /// Iterate created materialized views, excluding the indices.
677    pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
678        self.table_by_name
679            .values()
680            .filter(|v| v.is_mview() && v.is_created())
681    }
682
683    pub fn iter_created_mvs_with_acl<'a>(
684        &'a self,
685        user: &'a UserCatalog,
686    ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
687        self.table_by_name
688            .values()
689            .filter(|v| v.is_mview() && v.is_created() && has_access_to_object(user, v.id, v.owner))
690    }
691
692    /// Iterate all indices
693    pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
694        self.index_by_name.values()
695    }
696
697    pub fn iter_index_with_acl<'a>(
698        &'a self,
699        user: &'a UserCatalog,
700    ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
701        self.index_by_name
702            .values()
703            .filter(|idx| has_access_to_object(user, idx.id, idx.owner()))
704    }
705
706    /// Iterate all sources
707    pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
708        self.source_by_name.values()
709    }
710
711    pub fn iter_source_with_acl<'a>(
712        &'a self,
713        user: &'a UserCatalog,
714    ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
715        self.source_by_name
716            .values()
717            .filter(|s| has_access_to_object(user, s.id, s.owner))
718    }
719
720    pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
721        self.sink_by_name.values()
722    }
723
724    pub fn iter_sink_with_acl<'a>(
725        &'a self,
726        user: &'a UserCatalog,
727    ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
728        self.sink_by_name
729            .values()
730            .filter(|s| has_access_to_object(user, s.id, s.owner))
731    }
732
733    pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
734        self.subscription_by_name.values()
735    }
736
737    pub fn iter_subscription_with_acl<'a>(
738        &'a self,
739        user: &'a UserCatalog,
740    ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
741        self.subscription_by_name
742            .values()
743            .filter(|s| has_access_to_object(user, s.id, s.owner))
744    }
745
746    pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
747        self.view_by_name.values()
748    }
749
750    pub fn iter_view_with_acl<'a>(
751        &'a self,
752        user: &'a UserCatalog,
753    ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
754        self.view_by_name
755            .values()
756            .filter(|v| v.is_system_view() || has_access_to_object(user, v.id, v.owner))
757    }
758
759    pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
760        self.function_by_name.values().flat_map(|v| v.values())
761    }
762
763    pub fn iter_function_with_acl<'a>(
764        &'a self,
765        user: &'a UserCatalog,
766    ) -> impl Iterator<Item = &'a Arc<FunctionCatalog>> {
767        self.function_by_name
768            .values()
769            .flat_map(|v| v.values())
770            .filter(|f| has_access_to_object(user, f.id, f.owner))
771    }
772
773    pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
774        self.connection_by_name.values()
775    }
776
777    pub fn iter_connections_with_acl<'a>(
778        &'a self,
779        user: &'a UserCatalog,
780    ) -> impl Iterator<Item = &'a Arc<ConnectionCatalog>> {
781        self.connection_by_name
782            .values()
783            .filter(|c| has_access_to_object(user, c.id, c.owner))
784    }
785
786    pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
787        self.secret_by_name.values()
788    }
789
790    pub fn iter_secret_with_acl<'a>(
791        &'a self,
792        user: &'a UserCatalog,
793    ) -> impl Iterator<Item = &'a Arc<SecretCatalog>> {
794        self.secret_by_name
795            .values()
796            .filter(|s| has_access_to_object(user, s.id, s.owner))
797    }
798
799    pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
800        self.system_table_by_name.values()
801    }
802
803    pub fn get_table_by_name(
804        &self,
805        table_name: &str,
806        bind_creating_relations: bool,
807    ) -> Option<&Arc<TableCatalog>> {
808        self.table_by_name
809            .get(table_name)
810            .filter(|&table| bind_creating_relations || table.is_created())
811    }
812
813    pub fn get_any_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
814        self.get_table_by_name(table_name, true)
815    }
816
817    pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
818        self.get_table_by_name(table_name, false)
819    }
820
821    pub fn get_table_by_id(&self, table_id: TableId) -> Option<&Arc<TableCatalog>> {
822        self.table_by_id.get(&table_id)
823    }
824
825    pub fn get_created_table_by_id(&self, table_id: TableId) -> Option<&Arc<TableCatalog>> {
826        self.table_by_id
827            .get(&table_id)
828            .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
829    }
830
831    pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
832        self.view_by_name.get(view_name)
833    }
834
835    pub fn get_view_by_id(&self, view_id: ViewId) -> Option<&Arc<ViewCatalog>> {
836        self.view_by_id.get(&view_id)
837    }
838
839    pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
840        self.source_by_name.get(source_name)
841    }
842
843    pub fn get_source_by_id(&self, source_id: SourceId) -> Option<&Arc<SourceCatalog>> {
844        self.source_by_id.get(&source_id)
845    }
846
847    pub fn get_sink_by_name(
848        &self,
849        sink_name: &str,
850        bind_creating: bool,
851    ) -> Option<&Arc<SinkCatalog>> {
852        self.sink_by_name
853            .get(sink_name)
854            .filter(|s| bind_creating || s.is_created())
855    }
856
857    pub fn get_any_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
858        self.get_sink_by_name(sink_name, true)
859    }
860
861    pub fn get_created_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
862        self.get_sink_by_name(sink_name, false)
863    }
864
865    pub fn get_sink_by_id(&self, sink_id: SinkId) -> Option<&Arc<SinkCatalog>> {
866        self.sink_by_id.get(&sink_id)
867    }
868
869    pub fn get_subscription_by_name(
870        &self,
871        subscription_name: &str,
872    ) -> Option<&Arc<SubscriptionCatalog>> {
873        self.subscription_by_name.get(subscription_name)
874    }
875
876    pub fn get_subscription_by_id(
877        &self,
878        subscription_id: SubscriptionId,
879    ) -> Option<&Arc<SubscriptionCatalog>> {
880        self.subscription_by_id.get(&subscription_id)
881    }
882
883    pub fn get_index_by_name(
884        &self,
885        index_name: &str,
886        bind_creating: bool,
887    ) -> Option<&Arc<IndexCatalog>> {
888        self.index_by_name
889            .get(index_name)
890            .filter(|i| bind_creating || i.is_created())
891    }
892
893    pub fn get_any_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
894        self.get_index_by_name(index_name, true)
895    }
896
897    pub fn get_created_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
898        self.get_index_by_name(index_name, false)
899    }
900
901    pub fn get_index_by_id(&self, index_id: IndexId) -> Option<&Arc<IndexCatalog>> {
902        self.index_by_id.get(&index_id)
903    }
904
905    pub fn get_indexes_by_table_id(
906        &self,
907        table_id: TableId,
908        include_creating: bool,
909    ) -> Vec<Arc<IndexCatalog>> {
910        self.indexes_by_table_id
911            .get(&table_id)
912            .cloned()
913            .unwrap_or_default()
914            .into_iter()
915            .filter(|i| include_creating || i.is_created())
916            .collect()
917    }
918
919    pub fn get_any_indexes_by_table_id(&self, table_id: TableId) -> Vec<Arc<IndexCatalog>> {
920        self.get_indexes_by_table_id(table_id, true)
921    }
922
923    pub fn get_created_indexes_by_table_id(&self, table_id: TableId) -> Vec<Arc<IndexCatalog>> {
924        self.get_indexes_by_table_id(table_id, false)
925    }
926
927    pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
928        self.system_table_by_name.get(table_name)
929    }
930
931    pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
932        self.table_by_id
933            .get(&table_id)
934            .map(|table| table.name.clone())
935    }
936
937    pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
938        self.function_by_id.get(&function_id)
939    }
940
941    pub fn get_function_by_name_inputs(
942        &self,
943        name: &str,
944        inputs: &mut [ExprImpl],
945    ) -> Option<&Arc<FunctionCatalog>> {
946        infer_type_with_sigmap(
947            FuncName::Udf(name.to_owned()),
948            inputs,
949            &self.function_registry,
950        )
951        .ok()?;
952        let args = inputs.iter().map(|x| x.return_type()).collect_vec();
953        self.function_by_name.get(name)?.get(&args)
954    }
955
956    pub fn get_function_by_name_args(
957        &self,
958        name: &str,
959        args: &[DataType],
960    ) -> Option<&Arc<FunctionCatalog>> {
961        let args = args.iter().map(|x| Some(x.clone())).collect_vec();
962        let func = infer_type_name(
963            &self.function_registry,
964            FuncName::Udf(name.to_owned()),
965            &args,
966        )
967        .ok()?;
968
969        let args = func
970            .inputs_type
971            .iter()
972            .filter_map(|x| {
973                if let SigDataType::Exact(t) = x {
974                    Some(t.clone())
975                } else {
976                    None
977                }
978            })
979            .collect_vec();
980
981        self.function_by_name.get(name)?.get(&args)
982    }
983
984    pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
985        let functions = self.function_by_name.get(name)?;
986        if functions.is_empty() {
987            return None;
988        }
989        Some(functions.values().collect())
990    }
991
992    pub fn get_connection_by_id(
993        &self,
994        connection_id: ConnectionId,
995    ) -> Option<&Arc<ConnectionCatalog>> {
996        self.connection_by_id.get(&connection_id)
997    }
998
999    pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
1000        self.connection_by_name.get(connection_name)
1001    }
1002
1003    pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
1004        self.secret_by_name.get(secret_name)
1005    }
1006
1007    pub fn get_secret_by_id(&self, secret_id: SecretId) -> Option<&Arc<SecretCatalog>> {
1008        self.secret_by_id.get(&secret_id)
1009    }
1010
1011    /// get all sources referencing the connection
1012    pub fn get_source_ids_by_connection(
1013        &self,
1014        connection_id: ConnectionId,
1015    ) -> Option<Vec<SourceId>> {
1016        self.connection_source_ref
1017            .get(&connection_id)
1018            .map(|c| c.to_owned())
1019    }
1020
1021    /// get all sinks referencing the connection
1022    pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
1023        self.connection_sink_ref
1024            .get(&connection_id)
1025            .map(|s| s.to_owned())
1026    }
1027
1028    pub fn get_grant_object_by_oid(&self, oid: ObjectId) -> Option<OwnedGrantObject> {
1029        #[allow(clippy::manual_map)]
1030        if let Some(table) = self.get_created_table_by_id(oid.as_table_id()) {
1031            Some(OwnedGrantObject {
1032                owner: table.owner,
1033                object: Object::TableId(oid.as_table_id()),
1034            })
1035        } else if let Some(index) = self.get_index_by_id(oid.as_index_id()) {
1036            Some(OwnedGrantObject {
1037                owner: index.owner(),
1038                object: Object::TableId(oid.as_table_id()),
1039            })
1040        } else if let Some(source) = self.get_source_by_id(oid.as_source_id()) {
1041            Some(OwnedGrantObject {
1042                owner: source.owner,
1043                object: Object::SourceId(oid.as_source_id()),
1044            })
1045        } else if let Some(sink) = self.get_sink_by_id(oid.as_sink_id()) {
1046            Some(OwnedGrantObject {
1047                owner: sink.owner,
1048                object: Object::SinkId(oid.as_sink_id()),
1049            })
1050        } else if let Some(view) = self.get_view_by_id(oid.as_view_id()) {
1051            Some(OwnedGrantObject {
1052                owner: view.owner,
1053                object: Object::ViewId(oid.as_view_id()),
1054            })
1055        } else if let Some(function) = self.get_function_by_id(oid.as_function_id()) {
1056            Some(OwnedGrantObject {
1057                owner: function.owner(),
1058                object: Object::FunctionId(oid.as_function_id()),
1059            })
1060        } else if let Some(subscription) = self.get_subscription_by_id(oid.as_subscription_id()) {
1061            Some(OwnedGrantObject {
1062                owner: subscription.owner,
1063                object: Object::SubscriptionId(oid.as_subscription_id()),
1064            })
1065        } else if let Some(connection) = self.get_connection_by_id(oid.as_connection_id()) {
1066            Some(OwnedGrantObject {
1067                owner: connection.owner,
1068                object: Object::ConnectionId(oid.as_connection_id()),
1069            })
1070        } else if let Some(secret) = self.get_secret_by_id(oid.as_secret_id()) {
1071            Some(OwnedGrantObject {
1072                owner: secret.owner,
1073                object: Object::SecretId(oid.as_secret_id()),
1074            })
1075        } else {
1076            None
1077        }
1078    }
1079
1080    pub fn contains_object(&self, oid: ObjectId) -> bool {
1081        self.table_by_id.contains_key(&oid.as_table_id())
1082            || self.index_by_id.contains_key(&oid.as_index_id())
1083            || self.source_by_id.contains_key(&oid.as_source_id())
1084            || self.sink_by_id.contains_key(&oid.as_sink_id())
1085            || self.view_by_id.contains_key(&oid.as_view_id())
1086            || self.function_by_id.contains_key(&oid.as_function_id())
1087            || self
1088                .subscription_by_id
1089                .contains_key(&oid.as_subscription_id())
1090            || self.connection_by_id.contains_key(&oid.as_connection_id())
1091    }
1092
1093    pub fn id(&self) -> SchemaId {
1094        self.id
1095    }
1096
1097    pub fn database_id(&self) -> DatabaseId {
1098        self.database_id
1099    }
1100
1101    pub fn name(&self) -> String {
1102        self.name.clone()
1103    }
1104}
1105
1106impl OwnedByUserCatalog for SchemaCatalog {
1107    fn owner(&self) -> UserId {
1108        self.owner
1109    }
1110}
1111
1112impl From<&PbSchema> for SchemaCatalog {
1113    fn from(schema: &PbSchema) -> Self {
1114        Self {
1115            id: schema.id,
1116            owner: schema.owner,
1117            name: schema.name.clone(),
1118            database_id: schema.database_id,
1119            table_by_name: HashMap::new(),
1120            table_by_id: HashMap::new(),
1121            source_by_name: HashMap::new(),
1122            source_by_id: HashMap::new(),
1123            sink_by_name: HashMap::new(),
1124            sink_by_id: HashMap::new(),
1125            table_incoming_sinks: HashMap::new(),
1126            index_by_name: HashMap::new(),
1127            index_by_id: HashMap::new(),
1128            indexes_by_table_id: HashMap::new(),
1129            system_table_by_name: HashMap::new(),
1130            view_by_name: HashMap::new(),
1131            view_by_id: HashMap::new(),
1132            function_registry: FunctionRegistry::default(),
1133            function_by_name: HashMap::new(),
1134            function_by_id: HashMap::new(),
1135            connection_by_name: HashMap::new(),
1136            connection_by_id: HashMap::new(),
1137            secret_by_name: HashMap::new(),
1138            secret_by_id: HashMap::new(),
1139            _secret_source_ref: HashMap::new(),
1140            _secret_sink_ref: HashMap::new(),
1141            connection_source_ref: HashMap::new(),
1142            connection_sink_ref: HashMap::new(),
1143            subscription_by_name: HashMap::new(),
1144            subscription_by_id: HashMap::new(),
1145        }
1146    }
1147}