1use std::collections::HashMap;
16use std::collections::hash_map::Entry::{Occupied, Vacant};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25 PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26 PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47 id: SchemaId,
48 pub name: String,
49 pub database_id: DatabaseId,
50 table_by_name: HashMap<String, Arc<TableCatalog>>,
52 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54 source_by_name: HashMap<String, Arc<SourceCatalog>>,
55 source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56 sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57 sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58 subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
59 subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
60 index_by_name: HashMap<String, Arc<IndexCatalog>>,
61 index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
62 indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
63 view_by_name: HashMap<String, Arc<ViewCatalog>>,
64 view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
65 function_registry: FunctionRegistry,
66 function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
67 function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
68 connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
69 connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
70 secret_by_name: HashMap<String, Arc<SecretCatalog>>,
71 secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
72
73 _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
74 _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
75
76 connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
78 connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
80 system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
82 pub owner: u32,
83}
84
85impl SchemaCatalog {
86 pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
87 let name = prost.name.clone();
88 let id = prost.id.into();
89 let table: TableCatalog = prost.into();
90 let table_ref = Arc::new(table);
91
92 self.table_by_name
93 .try_insert(name, table_ref.clone())
94 .unwrap();
95 self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
96 table_ref
97 }
98
99 pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
100 self.system_table_by_name
101 .try_insert(sys_table.name.clone(), sys_table)
102 .unwrap();
103 }
104
105 pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
106 self.view_by_name
107 .try_insert(sys_view.name().to_owned(), sys_view.clone())
108 .unwrap();
109 self.view_by_id
110 .try_insert(sys_view.id, sys_view.clone())
111 .unwrap();
112 }
113
114 pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
115 let name = prost.name.clone();
116 let id = prost.id.into();
117 let table: TableCatalog = prost.into();
118 let table_ref = Arc::new(table);
119
120 let old_table = self.table_by_id.get(&id).unwrap();
121 if old_table.name() != name
123 && let Some(t) = self.table_by_name.get(old_table.name())
124 && t.id == id
125 {
126 self.table_by_name.remove(old_table.name());
127 }
128
129 self.table_by_name.insert(name, table_ref.clone());
130 self.table_by_id.insert(id, table_ref.clone());
131 table_ref
132 }
133
134 pub fn update_index(&mut self, prost: &PbIndex) {
135 let name = prost.name.clone();
136 let id = prost.id.into();
137 let old_index = self.index_by_id.get(&id).unwrap();
138 let index_table = self
139 .get_created_table_by_id(&prost.index_table_id.into())
140 .unwrap();
141 let primary_table = self
142 .get_created_table_by_id(&prost.primary_table_id.into())
143 .unwrap();
144 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
145 let index_ref = Arc::new(index);
146
147 if old_index.name != name
149 && let Some(idx) = self.index_by_name.get(&old_index.name)
150 && idx.id == id
151 {
152 self.index_by_name.remove(&old_index.name);
153 }
154 self.index_by_name.insert(name, index_ref.clone());
155 self.index_by_id.insert(id, index_ref.clone());
156
157 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
158 Occupied(mut entry) => {
159 let pos = entry
160 .get()
161 .iter()
162 .position(|x| x.id == index_ref.id)
163 .unwrap();
164 *entry.get_mut().get_mut(pos).unwrap() = index_ref;
165 }
166 Vacant(_entry) => {
167 unreachable!()
168 }
169 };
170 }
171
172 pub fn drop_table(&mut self, id: TableId) {
173 if let Some(table_ref) = self.table_by_id.remove(&id) {
174 self.table_by_name.remove(&table_ref.name).unwrap();
175 self.indexes_by_table_id.remove(&table_ref.id);
176 } else {
177 tracing::warn!(
178 id = ?id.table_id,
179 "table not found when dropping, frontend might not be notified yet"
180 );
181 }
182 }
183
184 pub fn create_index(&mut self, prost: &PbIndex) {
185 let name = prost.name.clone();
186 let id = prost.id.into();
187 let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap();
188 let primary_table = self
189 .get_created_table_by_id(&prost.primary_table_id.into())
190 .unwrap();
191 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
192 let index_ref = Arc::new(index);
193
194 self.index_by_name
195 .try_insert(name, index_ref.clone())
196 .unwrap();
197 self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
198 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
199 Occupied(mut entry) => {
200 entry.get_mut().push(index_ref);
201 }
202 Vacant(entry) => {
203 entry.insert(vec![index_ref]);
204 }
205 };
206 }
207
208 pub fn drop_index(&mut self, id: IndexId) {
209 let index_ref = self.index_by_id.remove(&id).unwrap();
210 self.index_by_name.remove(&index_ref.name).unwrap();
211 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
212 Occupied(mut entry) => {
213 let pos = entry
214 .get_mut()
215 .iter()
216 .position(|x| x.id == index_ref.id)
217 .unwrap();
218 entry.get_mut().remove(pos);
219 }
220 Vacant(_entry) => (),
221 };
222 }
223
224 pub fn create_source(&mut self, prost: &PbSource) {
225 let name = prost.name.clone();
226 let id = prost.id;
227 let source = SourceCatalog::from(prost);
228 let source_ref = Arc::new(source);
229
230 if let Some(connection_id) = source_ref.connection_id {
231 self.connection_source_ref
232 .entry(connection_id)
233 .and_modify(|sources| sources.push(source_ref.id))
234 .or_insert(vec![source_ref.id]);
235 }
236
237 self.source_by_name
238 .try_insert(name, source_ref.clone())
239 .unwrap();
240 self.source_by_id.try_insert(id, source_ref).unwrap();
241 }
242
243 pub fn drop_source(&mut self, id: SourceId) {
244 let source_ref = self.source_by_id.remove(&id).unwrap();
245 self.source_by_name.remove(&source_ref.name).unwrap();
246 if let Some(connection_id) = source_ref.connection_id {
247 if let Occupied(mut e) = self.connection_source_ref.entry(connection_id) {
248 let source_ids = e.get_mut();
249 source_ids.retain_mut(|sid| *sid != id);
250 if source_ids.is_empty() {
251 e.remove_entry();
252 }
253 }
254 }
255 }
256
257 pub fn update_source(&mut self, prost: &PbSource) {
258 let name = prost.name.clone();
259 let id = prost.id;
260 let source = SourceCatalog::from(prost);
261 let source_ref = Arc::new(source);
262
263 let old_source = self.source_by_id.get(&id).unwrap();
264 if old_source.name != name
266 && let Some(src) = self.source_by_name.get(&old_source.name)
267 && src.id == id
268 {
269 self.source_by_name.remove(&old_source.name);
270 }
271
272 self.source_by_name.insert(name, source_ref.clone());
273 self.source_by_id.insert(id, source_ref);
274 }
275
276 pub fn create_sink(&mut self, prost: &PbSink) {
277 let name = prost.name.clone();
278 let id = prost.id;
279 let sink = SinkCatalog::from(prost);
280 let sink_ref = Arc::new(sink);
281
282 if let Some(connection_id) = sink_ref.connection_id {
283 self.connection_sink_ref
284 .entry(connection_id.0)
285 .and_modify(|sinks| sinks.push(id))
286 .or_insert(vec![id]);
287 }
288
289 self.sink_by_name
290 .try_insert(name, sink_ref.clone())
291 .unwrap();
292 self.sink_by_id.try_insert(id, sink_ref).unwrap();
293 }
294
295 pub fn drop_sink(&mut self, id: SinkId) {
296 let sink_ref = self.sink_by_id.remove(&id).unwrap();
297 self.sink_by_name.remove(&sink_ref.name).unwrap();
298 if let Some(connection_id) = sink_ref.connection_id {
299 if let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0) {
300 let sink_ids = e.get_mut();
301 sink_ids.retain_mut(|sid| *sid != id);
302 if sink_ids.is_empty() {
303 e.remove_entry();
304 }
305 }
306 }
307 }
308
309 pub fn update_sink(&mut self, prost: &PbSink) {
310 let name = prost.name.clone();
311 let id = prost.id;
312 let sink = SinkCatalog::from(prost);
313 let sink_ref = Arc::new(sink);
314
315 let old_sink = self.sink_by_id.get(&id).unwrap();
316 if old_sink.name != name
318 && let Some(s) = self.sink_by_name.get(&old_sink.name)
319 && s.id.sink_id == id
320 {
321 self.sink_by_name.remove(&old_sink.name);
322 }
323
324 self.sink_by_name.insert(name, sink_ref.clone());
325 self.sink_by_id.insert(id, sink_ref);
326 }
327
328 pub fn create_subscription(&mut self, prost: &PbSubscription) {
329 let name = prost.name.clone();
330 let id = prost.id;
331 let subscription_catalog = SubscriptionCatalog::from(prost);
332 let subscription_ref = Arc::new(subscription_catalog);
333
334 self.subscription_by_name
335 .try_insert(name, subscription_ref.clone())
336 .unwrap();
337 self.subscription_by_id
338 .try_insert(id, subscription_ref)
339 .unwrap();
340 }
341
342 pub fn drop_subscription(&mut self, id: SubscriptionId) {
343 let subscription_ref = self.subscription_by_id.remove(&id).unwrap();
344 self.subscription_by_name
345 .remove(&subscription_ref.name)
346 .unwrap();
347 }
348
349 pub fn update_subscription(&mut self, prost: &PbSubscription) {
350 let name = prost.name.clone();
351 let id = prost.id;
352 let subscription = SubscriptionCatalog::from(prost);
353 let subscription_ref = Arc::new(subscription);
354
355 let old_subscription = self.subscription_by_id.get(&id).unwrap();
356 if old_subscription.name != name
358 && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
359 && s.id.subscription_id == id
360 {
361 self.subscription_by_name.remove(&old_subscription.name);
362 }
363
364 self.subscription_by_name
365 .insert(name, subscription_ref.clone());
366 self.subscription_by_id.insert(id, subscription_ref);
367 }
368
369 pub fn create_view(&mut self, prost: &PbView) {
370 let name = prost.name.clone();
371 let id = prost.id;
372 let view = ViewCatalog::from(prost);
373 let view_ref = Arc::new(view);
374
375 self.view_by_name
376 .try_insert(name, view_ref.clone())
377 .unwrap();
378 self.view_by_id.try_insert(id, view_ref).unwrap();
379 }
380
381 pub fn drop_view(&mut self, id: ViewId) {
382 let view_ref = self.view_by_id.remove(&id).unwrap();
383 self.view_by_name.remove(&view_ref.name).unwrap();
384 }
385
386 pub fn update_view(&mut self, prost: &PbView) {
387 let name = prost.name.clone();
388 let id = prost.id;
389 let view = ViewCatalog::from(prost);
390 let view_ref = Arc::new(view);
391
392 let old_view = self.view_by_id.get(&id).unwrap();
393 if old_view.name != name
395 && let Some(v) = self.view_by_name.get(old_view.name())
396 && v.id == id
397 {
398 self.view_by_name.remove(&old_view.name);
399 }
400
401 self.view_by_name.insert(name, view_ref.clone());
402 self.view_by_id.insert(id, view_ref);
403 }
404
405 pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
406 FuncSign {
407 name: FuncName::Udf(func.name.clone()),
408 inputs_type: func
409 .arg_types
410 .iter()
411 .map(|t| t.clone().into())
412 .collect_vec(),
413 variadic: false,
414 ret_type: func.return_type.clone().into(),
415 build: FuncBuilder::Udf,
416 type_infer: |_| Ok(DataType::Boolean),
418 deprecated: false,
419 }
420 }
421
422 pub fn create_function(&mut self, prost: &PbFunction) {
423 let name = prost.name.clone();
424 let id = prost.id;
425 let function = FunctionCatalog::from(prost);
426 let args = function.arg_types.clone();
427 let function_ref = Arc::new(function);
428
429 self.function_registry
430 .insert(Self::get_func_sign(&function_ref));
431 self.function_by_name
432 .entry(name)
433 .or_default()
434 .try_insert(args, function_ref.clone())
435 .expect("function already exists with same argument types");
436 self.function_by_id
437 .try_insert(id.into(), function_ref)
438 .expect("function id exists");
439 }
440
441 pub fn drop_function(&mut self, id: FunctionId) {
442 let function_ref = self
443 .function_by_id
444 .remove(&id)
445 .expect("function not found by id");
446
447 self.function_registry
448 .remove(Self::get_func_sign(&function_ref))
449 .expect("function not found in registry");
450
451 self.function_by_name
452 .get_mut(&function_ref.name)
453 .expect("function not found by name")
454 .remove(&function_ref.arg_types)
455 .expect("function not found by argument types");
456 }
457
458 pub fn update_function(&mut self, prost: &PbFunction) {
459 let name = prost.name.clone();
460 let id = prost.id.into();
461 let function = FunctionCatalog::from(prost);
462 let function_ref = Arc::new(function);
463
464 let old_function_by_id = self.function_by_id.get(&id).unwrap();
465 let old_function_by_name = self
466 .function_by_name
467 .get_mut(&old_function_by_id.name)
468 .unwrap();
469 if old_function_by_id.name != name
471 && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
472 && f.id == id
473 {
474 old_function_by_name.remove(&old_function_by_id.arg_types);
475 if old_function_by_name.is_empty() {
476 self.function_by_name.remove(&old_function_by_id.name);
477 }
478 }
479
480 self.function_by_name
481 .entry(name)
482 .or_default()
483 .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
484 self.function_by_id.insert(id, function_ref);
485 }
486
487 pub fn create_connection(&mut self, prost: &PbConnection) {
488 let name = prost.name.clone();
489 let id = prost.id;
490 let connection = ConnectionCatalog::from(prost);
491 let connection_ref = Arc::new(connection);
492 self.connection_by_name
493 .try_insert(name, connection_ref.clone())
494 .unwrap();
495 self.connection_by_id
496 .try_insert(id, connection_ref)
497 .unwrap();
498 }
499
500 pub fn update_connection(&mut self, prost: &PbConnection) {
501 let name = prost.name.clone();
502 let id = prost.id;
503 let connection = ConnectionCatalog::from(prost);
504 let connection_ref = Arc::new(connection);
505
506 let old_connection = self.connection_by_id.get(&id).unwrap();
507 if old_connection.name != name
509 && let Some(conn) = self.connection_by_name.get(&old_connection.name)
510 && conn.id == id
511 {
512 self.connection_by_name.remove(&old_connection.name);
513 }
514
515 self.connection_by_name.insert(name, connection_ref.clone());
516 self.connection_by_id.insert(id, connection_ref);
517 }
518
519 pub fn drop_connection(&mut self, connection_id: ConnectionId) {
520 let connection_ref = self
521 .connection_by_id
522 .remove(&connection_id)
523 .expect("connection not found by id");
524 self.connection_by_name
525 .remove(&connection_ref.name)
526 .expect("connection not found by name");
527 }
528
529 pub fn create_secret(&mut self, prost: &PbSecret) {
530 let name = prost.name.clone();
531 let id = SecretId::new(prost.id);
532 let secret = SecretCatalog::from(prost);
533 let secret_ref = Arc::new(secret);
534
535 self.secret_by_id
536 .try_insert(id, secret_ref.clone())
537 .unwrap();
538 self.secret_by_name
539 .try_insert(name, secret_ref.clone())
540 .unwrap();
541 }
542
543 pub fn update_secret(&mut self, prost: &PbSecret) {
544 let name = prost.name.clone();
545 let id = SecretId::new(prost.id);
546 let secret = SecretCatalog::from(prost);
547 let secret_ref = Arc::new(secret);
548
549 let old_secret = self.secret_by_id.get(&id).unwrap();
550 if old_secret.name != name
552 && let Some(s) = self.secret_by_name.get(&old_secret.name)
553 && s.id == id
554 {
555 self.secret_by_name.remove(&old_secret.name);
556 }
557
558 self.secret_by_name.insert(name, secret_ref.clone());
559 self.secret_by_id.insert(id, secret_ref);
560 }
561
562 pub fn drop_secret(&mut self, secret_id: SecretId) {
563 let secret_ref = self
564 .secret_by_id
565 .remove(&secret_id)
566 .expect("secret not found by id");
567 self.secret_by_name
568 .remove(&secret_ref.name)
569 .expect("secret not found by name");
570 }
571
572 pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
573 self.table_by_name.values()
574 }
575
576 pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
577 self.table_by_name.values().filter(|v| v.is_user_table())
578 }
579
580 pub fn iter_user_table_with_acl<'a>(
581 &'a self,
582 user: &'a UserCatalog,
583 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
584 self.table_by_name.values().filter(|v| {
585 v.is_user_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
586 })
587 }
588
589 pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
590 self.table_by_name
591 .values()
592 .filter(|v| v.is_internal_table())
593 }
594
595 pub fn iter_internal_table_with_acl<'a>(
596 &'a self,
597 user: &'a UserCatalog,
598 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
599 self.table_by_name.values().filter(|v| {
600 v.is_internal_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
601 })
602 }
603
604 pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
606 self.table_by_name
607 .values()
608 .filter(|v| !v.is_internal_table())
609 }
610
611 pub fn iter_table_mv_indices_with_acl<'a>(
612 &'a self,
613 user: &'a UserCatalog,
614 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
615 self.table_by_name.values().filter(|v| {
616 !v.is_internal_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
617 })
618 }
619
620 pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
622 self.table_by_name.values().filter(|v| v.is_mview())
623 }
624
625 pub fn iter_all_mvs_with_acl<'a>(
626 &'a self,
627 user: &'a UserCatalog,
628 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
629 self.table_by_name.values().filter(|v| {
630 v.is_mview() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
631 })
632 }
633
634 pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
636 self.table_by_name
637 .values()
638 .filter(|v| v.is_mview() && v.is_created())
639 }
640
641 pub fn iter_created_mvs_with_acl<'a>(
642 &'a self,
643 user: &'a UserCatalog,
644 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
645 self.table_by_name.values().filter(|v| {
646 v.is_mview()
647 && v.is_created()
648 && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
649 })
650 }
651
652 pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
654 self.index_by_name.values()
655 }
656
657 pub fn iter_index_with_acl<'a>(
658 &'a self,
659 user: &'a UserCatalog,
660 ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
661 self.index_by_name
662 .values()
663 .filter(|idx| has_access_to_object(user, &self.name, idx.id.index_id, idx.owner()))
664 }
665
666 pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
668 self.source_by_name.values()
669 }
670
671 pub fn iter_source_with_acl<'a>(
672 &'a self,
673 user: &'a UserCatalog,
674 ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
675 self.source_by_name
676 .values()
677 .filter(|s| has_access_to_object(user, &self.name, s.id, s.owner))
678 }
679
680 pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
681 self.sink_by_name.values()
682 }
683
684 pub fn iter_sink_with_acl<'a>(
685 &'a self,
686 user: &'a UserCatalog,
687 ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
688 self.sink_by_name
689 .values()
690 .filter(|s| has_access_to_object(user, &self.name, s.id.sink_id, s.owner.user_id))
691 }
692
693 pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
694 self.subscription_by_name.values()
695 }
696
697 pub fn iter_subscription_with_acl<'a>(
698 &'a self,
699 user: &'a UserCatalog,
700 ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
701 self.subscription_by_name.values().filter(|s| {
702 has_access_to_object(user, &self.name, s.id.subscription_id, s.owner.user_id)
703 })
704 }
705
706 pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
707 self.view_by_name.values()
708 }
709
710 pub fn iter_view_with_acl<'a>(
711 &'a self,
712 user: &'a UserCatalog,
713 ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
714 self.view_by_name
715 .values()
716 .filter(|v| v.is_system_view() || has_access_to_object(user, &self.name, v.id, v.owner))
717 }
718
719 pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
720 self.function_by_name.values().flat_map(|v| v.values())
721 }
722
723 pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
724 self.connection_by_name.values()
725 }
726
727 pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
728 self.secret_by_name.values()
729 }
730
731 pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
732 self.system_table_by_name.values()
733 }
734
735 pub fn get_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
736 self.table_by_name.get(table_name)
737 }
738
739 pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
740 self.table_by_name
741 .get(table_name)
742 .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
743 }
744
745 pub fn get_created_table_or_any_internal_table_by_name(
748 &self,
749 table_name: &str,
750 ) -> Option<&Arc<TableCatalog>> {
751 self.table_by_name.get(table_name).filter(|&table| {
752 table.stream_job_status == StreamJobStatus::Created || table.is_internal_table()
753 })
754 }
755
756 pub fn get_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
757 self.table_by_id.get(table_id)
758 }
759
760 pub fn get_created_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
761 self.table_by_id
762 .get(table_id)
763 .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
764 }
765
766 pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
767 self.view_by_name.get(view_name)
768 }
769
770 pub fn get_view_by_id(&self, view_id: &ViewId) -> Option<&Arc<ViewCatalog>> {
771 self.view_by_id.get(view_id)
772 }
773
774 pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
775 self.source_by_name.get(source_name)
776 }
777
778 pub fn get_source_by_id(&self, source_id: &SourceId) -> Option<&Arc<SourceCatalog>> {
779 self.source_by_id.get(source_id)
780 }
781
782 pub fn get_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
783 self.sink_by_name.get(sink_name)
784 }
785
786 pub fn get_sink_by_id(&self, sink_id: &SinkId) -> Option<&Arc<SinkCatalog>> {
787 self.sink_by_id.get(sink_id)
788 }
789
790 pub fn get_subscription_by_name(
791 &self,
792 subscription_name: &str,
793 ) -> Option<&Arc<SubscriptionCatalog>> {
794 self.subscription_by_name.get(subscription_name)
795 }
796
797 pub fn get_subscription_by_id(
798 &self,
799 subscription_id: &SubscriptionId,
800 ) -> Option<&Arc<SubscriptionCatalog>> {
801 self.subscription_by_id.get(subscription_id)
802 }
803
804 pub fn get_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
805 self.index_by_name.get(index_name)
806 }
807
808 pub fn get_index_by_id(&self, index_id: &IndexId) -> Option<&Arc<IndexCatalog>> {
809 self.index_by_id.get(index_id)
810 }
811
812 pub fn get_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
813 self.indexes_by_table_id
814 .get(table_id)
815 .cloned()
816 .unwrap_or_default()
817 }
818
819 pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
820 self.system_table_by_name.get(table_name)
821 }
822
823 pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
824 self.table_by_id
825 .get(&table_id)
826 .map(|table| table.name.clone())
827 }
828
829 pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
830 self.function_by_id.get(&function_id)
831 }
832
833 pub fn get_function_by_name_inputs(
834 &self,
835 name: &str,
836 inputs: &mut [ExprImpl],
837 ) -> Option<&Arc<FunctionCatalog>> {
838 infer_type_with_sigmap(
839 FuncName::Udf(name.to_owned()),
840 inputs,
841 &self.function_registry,
842 )
843 .ok()?;
844 let args = inputs.iter().map(|x| x.return_type()).collect_vec();
845 self.function_by_name.get(name)?.get(&args)
846 }
847
848 pub fn get_function_by_name_args(
849 &self,
850 name: &str,
851 args: &[DataType],
852 ) -> Option<&Arc<FunctionCatalog>> {
853 let args = args.iter().map(|x| Some(x.clone())).collect_vec();
854 let func = infer_type_name(
855 &self.function_registry,
856 FuncName::Udf(name.to_owned()),
857 &args,
858 )
859 .ok()?;
860
861 let args = func
862 .inputs_type
863 .iter()
864 .filter_map(|x| {
865 if let SigDataType::Exact(t) = x {
866 Some(t.clone())
867 } else {
868 None
869 }
870 })
871 .collect_vec();
872
873 self.function_by_name.get(name)?.get(&args)
874 }
875
876 pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
877 let functions = self.function_by_name.get(name)?;
878 if functions.is_empty() {
879 return None;
880 }
881 Some(functions.values().collect())
882 }
883
884 pub fn get_connection_by_id(
885 &self,
886 connection_id: &ConnectionId,
887 ) -> Option<&Arc<ConnectionCatalog>> {
888 self.connection_by_id.get(connection_id)
889 }
890
891 pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
892 self.connection_by_name.get(connection_name)
893 }
894
895 pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
896 self.secret_by_name.get(secret_name)
897 }
898
899 pub fn get_secret_by_id(&self, secret_id: &SecretId) -> Option<&Arc<SecretCatalog>> {
900 self.secret_by_id.get(secret_id)
901 }
902
903 pub fn get_source_ids_by_connection(
905 &self,
906 connection_id: ConnectionId,
907 ) -> Option<Vec<SourceId>> {
908 self.connection_source_ref
909 .get(&connection_id)
910 .map(|c| c.to_owned())
911 }
912
913 pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
915 self.connection_sink_ref
916 .get(&connection_id)
917 .map(|s| s.to_owned())
918 }
919
920 pub fn get_grant_object_by_oid(&self, oid: u32) -> Option<Object> {
921 #[allow(clippy::manual_map)]
922 if self.get_created_table_by_id(&TableId::new(oid)).is_some()
923 || self.get_index_by_id(&IndexId::new(oid)).is_some()
924 {
925 Some(Object::TableId(oid))
926 } else if self.get_source_by_id(&oid).is_some() {
927 Some(Object::SourceId(oid))
928 } else if self.get_sink_by_id(&oid).is_some() {
929 Some(Object::SinkId(oid))
930 } else if self.get_view_by_id(&oid).is_some() {
931 Some(Object::ViewId(oid))
932 } else {
933 None
934 }
935 }
936
937 pub fn contains_object(&self, oid: u32) -> bool {
938 self.table_by_id.contains_key(&TableId::new(oid))
939 || self.index_by_id.contains_key(&IndexId::new(oid))
940 || self.source_by_id.contains_key(&oid)
941 || self.sink_by_id.contains_key(&oid)
942 || self.view_by_id.contains_key(&oid)
943 || self.function_by_id.contains_key(&FunctionId::new(oid))
944 || self.subscription_by_id.contains_key(&oid)
945 || self.connection_by_id.contains_key(&oid)
946 }
947
948 pub fn id(&self) -> SchemaId {
949 self.id
950 }
951
952 pub fn database_id(&self) -> DatabaseId {
953 self.database_id
954 }
955
956 pub fn name(&self) -> String {
957 self.name.clone()
958 }
959}
960
961impl OwnedByUserCatalog for SchemaCatalog {
962 fn owner(&self) -> UserId {
963 self.owner
964 }
965}
966
967impl From<&PbSchema> for SchemaCatalog {
968 fn from(schema: &PbSchema) -> Self {
969 Self {
970 id: schema.id,
971 owner: schema.owner,
972 name: schema.name.clone(),
973 database_id: schema.database_id,
974 table_by_name: HashMap::new(),
975 table_by_id: HashMap::new(),
976 source_by_name: HashMap::new(),
977 source_by_id: HashMap::new(),
978 sink_by_name: HashMap::new(),
979 sink_by_id: HashMap::new(),
980 index_by_name: HashMap::new(),
981 index_by_id: HashMap::new(),
982 indexes_by_table_id: HashMap::new(),
983 system_table_by_name: HashMap::new(),
984 view_by_name: HashMap::new(),
985 view_by_id: HashMap::new(),
986 function_registry: FunctionRegistry::default(),
987 function_by_name: HashMap::new(),
988 function_by_id: HashMap::new(),
989 connection_by_name: HashMap::new(),
990 connection_by_id: HashMap::new(),
991 secret_by_name: HashMap::new(),
992 secret_by_id: HashMap::new(),
993 _secret_source_ref: HashMap::new(),
994 _secret_sink_ref: HashMap::new(),
995 connection_source_ref: HashMap::new(),
996 connection_sink_ref: HashMap::new(),
997 subscription_by_name: HashMap::new(),
998 subscription_by_id: HashMap::new(),
999 }
1000 }
1001}