1use std::collections::hash_map::Entry::{Occupied, Vacant};
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25 PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26 PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, OwnedGrantObject, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47 id: SchemaId,
48 pub name: String,
49 pub database_id: DatabaseId,
50 table_by_name: HashMap<String, Arc<TableCatalog>>,
52 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54 source_by_name: HashMap<String, Arc<SourceCatalog>>,
55 source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56 sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57 sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58 table_incoming_sinks: HashMap<TableId, HashSet<SinkId>>,
60 subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
61 subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
62 index_by_name: HashMap<String, Arc<IndexCatalog>>,
63 index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
64 indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
65 view_by_name: HashMap<String, Arc<ViewCatalog>>,
66 view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
67 function_registry: FunctionRegistry,
68 function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
69 function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
70 connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
71 connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
72 secret_by_name: HashMap<String, Arc<SecretCatalog>>,
73 secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
74
75 _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
76 _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
77
78 connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
80 connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
82 system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
84 pub owner: u32,
85}
86
87impl SchemaCatalog {
88 pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
89 let name = prost.name.clone();
90 let id = prost.id;
91 let table: TableCatalog = prost.into();
92 let table_ref = Arc::new(table);
93
94 self.table_by_name
95 .try_insert(name, table_ref.clone())
96 .unwrap();
97 self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
98 table_ref
99 }
100
101 pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
102 self.system_table_by_name
103 .try_insert(sys_table.name.clone(), sys_table)
104 .unwrap();
105 }
106
107 pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
108 self.view_by_name
109 .try_insert(sys_view.name().to_owned(), sys_view.clone())
110 .unwrap();
111 self.view_by_id
112 .try_insert(sys_view.id, sys_view.clone())
113 .unwrap();
114 }
115
116 pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
117 let name = prost.name.clone();
118 let id = prost.id;
119 let table: TableCatalog = prost.into();
120 let table_ref = Arc::new(table);
121
122 let old_table = self.table_by_id.get(&id).unwrap();
123 if old_table.name() != name
125 && let Some(t) = self.table_by_name.get(old_table.name())
126 && t.id == id
127 {
128 self.table_by_name.remove(old_table.name());
129 }
130
131 self.table_by_name.insert(name, table_ref.clone());
132 self.table_by_id.insert(id, table_ref.clone());
133 table_ref
134 }
135
136 pub fn update_index(&mut self, prost: &PbIndex) {
137 let name = prost.name.clone();
138 let id = prost.id;
139 let old_index = self.index_by_id.get(&id).unwrap();
140 let index_table = self.get_created_table_by_id(prost.index_table_id).unwrap();
141 let primary_table = self
142 .get_created_table_by_id(prost.primary_table_id)
143 .unwrap();
144 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
145 let index_ref = Arc::new(index);
146
147 if old_index.name != name
149 && let Some(idx) = self.index_by_name.get(&old_index.name)
150 && idx.id == id
151 {
152 self.index_by_name.remove(&old_index.name);
153 }
154 self.index_by_name.insert(name, index_ref.clone());
155 self.index_by_id.insert(id, index_ref.clone());
156
157 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
158 Occupied(mut entry) => {
159 let pos = entry
160 .get()
161 .iter()
162 .position(|x| x.id == index_ref.id)
163 .unwrap();
164 *entry.get_mut().get_mut(pos).unwrap() = index_ref;
165 }
166 Vacant(_entry) => {
167 unreachable!()
168 }
169 };
170 }
171
172 pub fn drop_table(&mut self, id: TableId) {
173 if let Some(table_ref) = self.table_by_id.remove(&id) {
174 self.table_by_name.remove(&table_ref.name).unwrap();
175 self.indexes_by_table_id.remove(&table_ref.id);
176 } else {
177 tracing::warn!(
178 %id,
179 "table not found when dropping, frontend might not be notified yet"
180 );
181 }
182 }
183
184 pub fn create_index(&mut self, prost: &PbIndex) {
185 let name = prost.name.clone();
186 let id = prost.id;
187 let index_table = self.get_table_by_id(prost.index_table_id).unwrap();
188 let primary_table = self
189 .get_created_table_by_id(prost.primary_table_id)
190 .unwrap();
191 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
192 let index_ref = Arc::new(index);
193
194 self.index_by_name
195 .try_insert(name, index_ref.clone())
196 .unwrap();
197 self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
198 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
199 Occupied(mut entry) => {
200 entry.get_mut().push(index_ref);
201 }
202 Vacant(entry) => {
203 entry.insert(vec![index_ref]);
204 }
205 };
206 }
207
208 pub fn drop_index(&mut self, id: IndexId) {
209 let index_ref = self.index_by_id.remove(&id).unwrap();
210 self.index_by_name.remove(&index_ref.name).unwrap();
211 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
212 Occupied(mut entry) => {
213 let pos = entry
214 .get_mut()
215 .iter()
216 .position(|x| x.id == index_ref.id)
217 .unwrap();
218 entry.get_mut().remove(pos);
219 }
220 Vacant(_entry) => (),
221 };
222 }
223
224 pub fn create_source(&mut self, prost: &PbSource) {
225 let name = prost.name.clone();
226 let id = prost.id;
227 let source = SourceCatalog::from(prost);
228 let source_ref = Arc::new(source);
229
230 if let Some(connection_id) = source_ref.connection_id {
231 self.connection_source_ref
232 .entry(connection_id)
233 .and_modify(|sources| sources.push(source_ref.id))
234 .or_insert(vec![source_ref.id]);
235 }
236
237 self.source_by_name
238 .try_insert(name, source_ref.clone())
239 .unwrap();
240 self.source_by_id.try_insert(id, source_ref).unwrap();
241 }
242
243 pub fn drop_source(&mut self, id: SourceId) {
244 let source_ref = self.source_by_id.remove(&id).unwrap();
245 self.source_by_name.remove(&source_ref.name).unwrap();
246 if let Some(connection_id) = source_ref.connection_id
247 && let Occupied(mut e) = self.connection_source_ref.entry(connection_id)
248 {
249 let source_ids = e.get_mut();
250 source_ids.retain_mut(|sid| *sid != id);
251 if source_ids.is_empty() {
252 e.remove_entry();
253 }
254 }
255 }
256
257 pub fn update_source(&mut self, prost: &PbSource) {
258 let name = prost.name.clone();
259 let id = prost.id;
260 let source = SourceCatalog::from(prost);
261 let source_ref = Arc::new(source);
262
263 let old_source = self.source_by_id.get(&id).unwrap();
264 if old_source.name != name
266 && let Some(src) = self.source_by_name.get(&old_source.name)
267 && src.id == id
268 {
269 self.source_by_name.remove(&old_source.name);
270 }
271
272 self.source_by_name.insert(name, source_ref.clone());
273 self.source_by_id.insert(id, source_ref);
274 }
275
276 pub fn create_sink(&mut self, prost: &PbSink) {
277 let name = prost.name.clone();
278 let id = prost.id;
279 let sink = SinkCatalog::from(prost);
280 let sink_ref = Arc::new(sink);
281
282 if let Some(connection_id) = sink_ref.connection_id {
283 self.connection_sink_ref
284 .entry(connection_id)
285 .and_modify(|sinks| sinks.push(id))
286 .or_insert(vec![id]);
287 }
288
289 if let Some(target_table) = sink_ref.target_table {
290 assert!(
291 self.table_incoming_sinks
292 .entry(target_table)
293 .or_default()
294 .insert(sink_ref.id)
295 );
296 }
297
298 self.sink_by_name
299 .try_insert(name, sink_ref.clone())
300 .unwrap();
301 self.sink_by_id.try_insert(id, sink_ref).unwrap();
302 }
303
304 pub fn drop_sink(&mut self, id: SinkId) {
305 if let Some(sink_ref) = self.sink_by_id.remove(&id) {
306 self.sink_by_name.remove(&sink_ref.name).unwrap();
307 if let Some(connection_id) = sink_ref.connection_id
308 && let Occupied(mut e) = self.connection_sink_ref.entry(connection_id)
309 {
310 let sink_ids = e.get_mut();
311 sink_ids.retain_mut(|sid| *sid != id);
312 if sink_ids.is_empty() {
313 e.remove_entry();
314 }
315 }
316 if let Some(target_table) = sink_ref.target_table {
317 let incoming_sinks = self
318 .table_incoming_sinks
319 .get_mut(&target_table)
320 .expect("should exists");
321 assert!(incoming_sinks.remove(&sink_ref.id));
322 if incoming_sinks.is_empty() {
323 self.table_incoming_sinks.remove(&target_table);
324 }
325 }
326 } else {
327 tracing::warn!(
328 %id,
329 "sink not found when dropping, frontend might not be notified yet"
330 );
331 }
332 }
333
334 pub fn update_sink(&mut self, prost: &PbSink) {
335 let name = prost.name.clone();
336 let id = prost.id;
337 let sink = SinkCatalog::from(prost);
338 let sink_ref = Arc::new(sink);
339
340 let old_sink = self.sink_by_id.get(&id).unwrap();
341 assert_eq!(sink_ref.target_table, old_sink.target_table);
342 if old_sink.name != name
344 && let Some(s) = self.sink_by_name.get(&old_sink.name)
345 && s.id == id
346 {
347 self.sink_by_name.remove(&old_sink.name);
348 }
349
350 self.sink_by_name.insert(name, sink_ref.clone());
351 self.sink_by_id.insert(id, sink_ref);
352 }
353
354 pub fn table_incoming_sinks(&self, table_id: TableId) -> Option<&HashSet<SinkId>> {
355 self.table_incoming_sinks.get(&table_id)
356 }
357
358 pub fn create_subscription(&mut self, prost: &PbSubscription) {
359 let name = prost.name.clone();
360 let id = prost.id;
361 let subscription_catalog = SubscriptionCatalog::from(prost);
362 let subscription_ref = Arc::new(subscription_catalog);
363
364 self.subscription_by_name
365 .try_insert(name, subscription_ref.clone())
366 .unwrap();
367 self.subscription_by_id
368 .try_insert(id, subscription_ref)
369 .unwrap();
370 }
371
372 pub fn drop_subscription(&mut self, id: SubscriptionId) {
373 let subscription_ref = self.subscription_by_id.remove(&id);
374 if let Some(subscription_ref) = subscription_ref {
375 self.subscription_by_name.remove(&subscription_ref.name);
376 }
377 }
378
379 pub fn update_subscription(&mut self, prost: &PbSubscription) {
380 let name = prost.name.clone();
381 let id = prost.id;
382 let subscription = SubscriptionCatalog::from(prost);
383 let subscription_ref = Arc::new(subscription);
384
385 let old_subscription = self.subscription_by_id.get(&id).unwrap();
386 if old_subscription.name != name
388 && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
389 && s.id == id
390 {
391 self.subscription_by_name.remove(&old_subscription.name);
392 }
393
394 self.subscription_by_name
395 .insert(name, subscription_ref.clone());
396 self.subscription_by_id.insert(id, subscription_ref);
397 }
398
399 pub fn create_view(&mut self, prost: &PbView) {
400 let name = prost.name.clone();
401 let id = prost.id;
402 let view = ViewCatalog::from(prost);
403 let view_ref = Arc::new(view);
404
405 self.view_by_name
406 .try_insert(name, view_ref.clone())
407 .unwrap();
408 self.view_by_id.try_insert(id, view_ref).unwrap();
409 }
410
411 pub fn drop_view(&mut self, id: ViewId) {
412 let view_ref = self.view_by_id.remove(&id).unwrap();
413 self.view_by_name.remove(&view_ref.name).unwrap();
414 }
415
416 pub fn update_view(&mut self, prost: &PbView) {
417 let name = prost.name.clone();
418 let id = prost.id;
419 let view = ViewCatalog::from(prost);
420 let view_ref = Arc::new(view);
421
422 let old_view = self.view_by_id.get(&id).unwrap();
423 if old_view.name != name
425 && let Some(v) = self.view_by_name.get(old_view.name())
426 && v.id == id
427 {
428 self.view_by_name.remove(&old_view.name);
429 }
430
431 self.view_by_name.insert(name, view_ref.clone());
432 self.view_by_id.insert(id, view_ref);
433 }
434
435 pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
436 FuncSign {
437 name: FuncName::Udf(func.name.clone()),
438 inputs_type: func
439 .arg_types
440 .iter()
441 .map(|t| t.clone().into())
442 .collect_vec(),
443 variadic: false,
444 ret_type: func.return_type.clone().into(),
445 build: FuncBuilder::Udf,
446 type_infer: |_| Ok(DataType::Boolean),
448 deprecated: false,
449 }
450 }
451
452 pub fn create_function(&mut self, prost: &PbFunction) {
453 let name = prost.name.clone();
454 let id = prost.id;
455 let function = FunctionCatalog::from(prost);
456 let args = function.arg_types.clone();
457 let function_ref = Arc::new(function);
458
459 self.function_registry
460 .insert(Self::get_func_sign(&function_ref));
461 self.function_by_name
462 .entry(name)
463 .or_default()
464 .try_insert(args, function_ref.clone())
465 .expect("function already exists with same argument types");
466 self.function_by_id
467 .try_insert(id, function_ref)
468 .expect("function id exists");
469 }
470
471 pub fn drop_function(&mut self, id: FunctionId) {
472 let function_ref = self
473 .function_by_id
474 .remove(&id)
475 .expect("function not found by id");
476
477 self.function_registry
478 .remove(Self::get_func_sign(&function_ref))
479 .expect("function not found in registry");
480
481 self.function_by_name
482 .get_mut(&function_ref.name)
483 .expect("function not found by name")
484 .remove(&function_ref.arg_types)
485 .expect("function not found by argument types");
486 }
487
488 pub fn update_function(&mut self, prost: &PbFunction) {
489 let name = prost.name.clone();
490 let id = prost.id;
491 let function = FunctionCatalog::from(prost);
492 let function_ref = Arc::new(function);
493
494 let old_function_by_id = self.function_by_id.get(&id).unwrap();
495 let old_function_by_name = self
496 .function_by_name
497 .get_mut(&old_function_by_id.name)
498 .unwrap();
499 if old_function_by_id.name != name
501 && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
502 && f.id == id
503 {
504 old_function_by_name.remove(&old_function_by_id.arg_types);
505 if old_function_by_name.is_empty() {
506 self.function_by_name.remove(&old_function_by_id.name);
507 }
508 }
509
510 self.function_by_name
511 .entry(name)
512 .or_default()
513 .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
514 self.function_by_id.insert(id, function_ref);
515 }
516
517 pub fn create_connection(&mut self, prost: &PbConnection) {
518 let name = prost.name.clone();
519 let id = prost.id;
520 let connection = ConnectionCatalog::from(prost);
521 let connection_ref = Arc::new(connection);
522 self.connection_by_name
523 .try_insert(name, connection_ref.clone())
524 .unwrap();
525 self.connection_by_id
526 .try_insert(id, connection_ref)
527 .unwrap();
528 }
529
530 pub fn update_connection(&mut self, prost: &PbConnection) {
531 let name = prost.name.clone();
532 let id = prost.id;
533 let connection = ConnectionCatalog::from(prost);
534 let connection_ref = Arc::new(connection);
535
536 let old_connection = self.connection_by_id.get(&id).unwrap();
537 if old_connection.name != name
539 && let Some(conn) = self.connection_by_name.get(&old_connection.name)
540 && conn.id == id
541 {
542 self.connection_by_name.remove(&old_connection.name);
543 }
544
545 self.connection_by_name.insert(name, connection_ref.clone());
546 self.connection_by_id.insert(id, connection_ref);
547 }
548
549 pub fn drop_connection(&mut self, connection_id: ConnectionId) {
550 let connection_ref = self
551 .connection_by_id
552 .remove(&connection_id)
553 .expect("connection not found by id");
554 self.connection_by_name
555 .remove(&connection_ref.name)
556 .expect("connection not found by name");
557 }
558
559 pub fn create_secret(&mut self, prost: &PbSecret) {
560 let name = prost.name.clone();
561 let id = prost.id;
562 let secret = SecretCatalog::from(prost);
563 let secret_ref = Arc::new(secret);
564
565 self.secret_by_id
566 .try_insert(id, secret_ref.clone())
567 .unwrap();
568 self.secret_by_name.try_insert(name, secret_ref).unwrap();
569 }
570
571 pub fn update_secret(&mut self, prost: &PbSecret) {
572 let name = prost.name.clone();
573 let id = prost.id;
574 let secret = SecretCatalog::from(prost);
575 let secret_ref = Arc::new(secret);
576
577 let old_secret = self.secret_by_id.get(&id).unwrap();
578 if old_secret.name != name
580 && let Some(s) = self.secret_by_name.get(&old_secret.name)
581 && s.id == id
582 {
583 self.secret_by_name.remove(&old_secret.name);
584 }
585
586 self.secret_by_name.insert(name, secret_ref.clone());
587 self.secret_by_id.insert(id, secret_ref);
588 }
589
590 pub fn drop_secret(&mut self, secret_id: SecretId) {
591 let secret_ref = self
592 .secret_by_id
593 .remove(&secret_id)
594 .expect("secret not found by id");
595 self.secret_by_name
596 .remove(&secret_ref.name)
597 .expect("secret not found by name");
598 }
599
600 pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
601 self.table_by_name.values()
602 }
603
604 pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
605 self.table_by_name.values().filter(|v| v.is_user_table())
606 }
607
608 pub fn iter_user_table_with_acl<'a>(
609 &'a self,
610 user: &'a UserCatalog,
611 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
612 self.table_by_name
613 .values()
614 .filter(|v| v.is_user_table() && has_access_to_object(user, v.id, v.owner))
615 }
616
617 pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
618 self.table_by_name
619 .values()
620 .filter(|v| v.is_internal_table())
621 }
622
623 pub fn iter_internal_table_with_acl<'a>(
624 &'a self,
625 user: &'a UserCatalog,
626 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
627 self.table_by_name
628 .values()
629 .filter(|v| v.is_internal_table() && has_access_to_object(user, v.id, v.owner))
630 }
631
632 pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
634 self.table_by_name
635 .values()
636 .filter(|v| !v.is_internal_table())
637 }
638
639 pub fn iter_table_mv_indices_with_acl<'a>(
640 &'a self,
641 user: &'a UserCatalog,
642 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
643 self.table_by_name
644 .values()
645 .filter(|v| !v.is_internal_table() && has_access_to_object(user, v.id, v.owner))
646 }
647
648 pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
650 self.table_by_name.values().filter(|v| v.is_mview())
651 }
652
653 pub fn iter_all_mvs_with_acl<'a>(
654 &'a self,
655 user: &'a UserCatalog,
656 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
657 self.table_by_name
658 .values()
659 .filter(|v| v.is_mview() && has_access_to_object(user, v.id, v.owner))
660 }
661
662 pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
664 self.table_by_name
665 .values()
666 .filter(|v| v.is_mview() && v.is_created())
667 }
668
669 pub fn iter_created_mvs_with_acl<'a>(
670 &'a self,
671 user: &'a UserCatalog,
672 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
673 self.table_by_name
674 .values()
675 .filter(|v| v.is_mview() && v.is_created() && has_access_to_object(user, v.id, v.owner))
676 }
677
678 pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
680 self.index_by_name.values()
681 }
682
683 pub fn iter_index_with_acl<'a>(
684 &'a self,
685 user: &'a UserCatalog,
686 ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
687 self.index_by_name
688 .values()
689 .filter(|idx| has_access_to_object(user, idx.id, idx.owner()))
690 }
691
692 pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
694 self.source_by_name.values()
695 }
696
697 pub fn iter_source_with_acl<'a>(
698 &'a self,
699 user: &'a UserCatalog,
700 ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
701 self.source_by_name
702 .values()
703 .filter(|s| has_access_to_object(user, s.id, s.owner))
704 }
705
706 pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
707 self.sink_by_name.values()
708 }
709
710 pub fn iter_sink_with_acl<'a>(
711 &'a self,
712 user: &'a UserCatalog,
713 ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
714 self.sink_by_name
715 .values()
716 .filter(|s| has_access_to_object(user, s.id, s.owner.user_id))
717 }
718
719 pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
720 self.subscription_by_name.values()
721 }
722
723 pub fn iter_subscription_with_acl<'a>(
724 &'a self,
725 user: &'a UserCatalog,
726 ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
727 self.subscription_by_name
728 .values()
729 .filter(|s| has_access_to_object(user, s.id, s.owner.user_id))
730 }
731
732 pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
733 self.view_by_name.values()
734 }
735
736 pub fn iter_view_with_acl<'a>(
737 &'a self,
738 user: &'a UserCatalog,
739 ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
740 self.view_by_name
741 .values()
742 .filter(|v| v.is_system_view() || has_access_to_object(user, v.id, v.owner))
743 }
744
745 pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
746 self.function_by_name.values().flat_map(|v| v.values())
747 }
748
749 pub fn iter_function_with_acl<'a>(
750 &'a self,
751 user: &'a UserCatalog,
752 ) -> impl Iterator<Item = &'a Arc<FunctionCatalog>> {
753 self.function_by_name
754 .values()
755 .flat_map(|v| v.values())
756 .filter(|f| has_access_to_object(user, f.id, f.owner))
757 }
758
759 pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
760 self.connection_by_name.values()
761 }
762
763 pub fn iter_connections_with_acl<'a>(
764 &'a self,
765 user: &'a UserCatalog,
766 ) -> impl Iterator<Item = &'a Arc<ConnectionCatalog>> {
767 self.connection_by_name
768 .values()
769 .filter(|c| has_access_to_object(user, c.id, c.owner))
770 }
771
772 pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
773 self.secret_by_name.values()
774 }
775
776 pub fn iter_secret_with_acl<'a>(
777 &'a self,
778 user: &'a UserCatalog,
779 ) -> impl Iterator<Item = &'a Arc<SecretCatalog>> {
780 self.secret_by_name
781 .values()
782 .filter(|s| has_access_to_object(user, s.id, s.owner))
783 }
784
785 pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
786 self.system_table_by_name.values()
787 }
788
789 pub fn get_table_by_name(
790 &self,
791 table_name: &str,
792 bind_creating_relations: bool,
793 ) -> Option<&Arc<TableCatalog>> {
794 self.table_by_name
795 .get(table_name)
796 .filter(|&table| bind_creating_relations || table.is_created())
797 }
798
799 pub fn get_any_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
800 self.get_table_by_name(table_name, true)
801 }
802
803 pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
804 self.get_table_by_name(table_name, false)
805 }
806
807 pub fn get_table_by_id(&self, table_id: TableId) -> Option<&Arc<TableCatalog>> {
808 self.table_by_id.get(&table_id)
809 }
810
811 pub fn get_created_table_by_id(&self, table_id: TableId) -> Option<&Arc<TableCatalog>> {
812 self.table_by_id
813 .get(&table_id)
814 .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
815 }
816
817 pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
818 self.view_by_name.get(view_name)
819 }
820
821 pub fn get_view_by_id(&self, view_id: ViewId) -> Option<&Arc<ViewCatalog>> {
822 self.view_by_id.get(&view_id)
823 }
824
825 pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
826 self.source_by_name.get(source_name)
827 }
828
829 pub fn get_source_by_id(&self, source_id: SourceId) -> Option<&Arc<SourceCatalog>> {
830 self.source_by_id.get(&source_id)
831 }
832
833 pub fn get_sink_by_name(
834 &self,
835 sink_name: &str,
836 bind_creating: bool,
837 ) -> Option<&Arc<SinkCatalog>> {
838 self.sink_by_name
839 .get(sink_name)
840 .filter(|s| bind_creating || s.is_created())
841 }
842
843 pub fn get_any_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
844 self.get_sink_by_name(sink_name, true)
845 }
846
847 pub fn get_created_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
848 self.get_sink_by_name(sink_name, false)
849 }
850
851 pub fn get_sink_by_id(&self, sink_id: SinkId) -> Option<&Arc<SinkCatalog>> {
852 self.sink_by_id.get(&sink_id)
853 }
854
855 pub fn get_subscription_by_name(
856 &self,
857 subscription_name: &str,
858 ) -> Option<&Arc<SubscriptionCatalog>> {
859 self.subscription_by_name.get(subscription_name)
860 }
861
862 pub fn get_subscription_by_id(
863 &self,
864 subscription_id: SubscriptionId,
865 ) -> Option<&Arc<SubscriptionCatalog>> {
866 self.subscription_by_id.get(&subscription_id)
867 }
868
869 pub fn get_index_by_name(
870 &self,
871 index_name: &str,
872 bind_creating: bool,
873 ) -> Option<&Arc<IndexCatalog>> {
874 self.index_by_name
875 .get(index_name)
876 .filter(|i| bind_creating || i.is_created())
877 }
878
879 pub fn get_any_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
880 self.get_index_by_name(index_name, true)
881 }
882
883 pub fn get_created_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
884 self.get_index_by_name(index_name, false)
885 }
886
887 pub fn get_index_by_id(&self, index_id: IndexId) -> Option<&Arc<IndexCatalog>> {
888 self.index_by_id.get(&index_id)
889 }
890
891 pub fn get_indexes_by_table_id(
892 &self,
893 table_id: TableId,
894 include_creating: bool,
895 ) -> Vec<Arc<IndexCatalog>> {
896 self.indexes_by_table_id
897 .get(&table_id)
898 .cloned()
899 .unwrap_or_default()
900 .into_iter()
901 .filter(|i| include_creating || i.is_created())
902 .collect()
903 }
904
905 pub fn get_any_indexes_by_table_id(&self, table_id: TableId) -> Vec<Arc<IndexCatalog>> {
906 self.get_indexes_by_table_id(table_id, true)
907 }
908
909 pub fn get_created_indexes_by_table_id(&self, table_id: TableId) -> Vec<Arc<IndexCatalog>> {
910 self.get_indexes_by_table_id(table_id, false)
911 }
912
913 pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
914 self.system_table_by_name.get(table_name)
915 }
916
917 pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
918 self.table_by_id
919 .get(&table_id)
920 .map(|table| table.name.clone())
921 }
922
923 pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
924 self.function_by_id.get(&function_id)
925 }
926
927 pub fn get_function_by_name_inputs(
928 &self,
929 name: &str,
930 inputs: &mut [ExprImpl],
931 ) -> Option<&Arc<FunctionCatalog>> {
932 infer_type_with_sigmap(
933 FuncName::Udf(name.to_owned()),
934 inputs,
935 &self.function_registry,
936 )
937 .ok()?;
938 let args = inputs.iter().map(|x| x.return_type()).collect_vec();
939 self.function_by_name.get(name)?.get(&args)
940 }
941
942 pub fn get_function_by_name_args(
943 &self,
944 name: &str,
945 args: &[DataType],
946 ) -> Option<&Arc<FunctionCatalog>> {
947 let args = args.iter().map(|x| Some(x.clone())).collect_vec();
948 let func = infer_type_name(
949 &self.function_registry,
950 FuncName::Udf(name.to_owned()),
951 &args,
952 )
953 .ok()?;
954
955 let args = func
956 .inputs_type
957 .iter()
958 .filter_map(|x| {
959 if let SigDataType::Exact(t) = x {
960 Some(t.clone())
961 } else {
962 None
963 }
964 })
965 .collect_vec();
966
967 self.function_by_name.get(name)?.get(&args)
968 }
969
970 pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
971 let functions = self.function_by_name.get(name)?;
972 if functions.is_empty() {
973 return None;
974 }
975 Some(functions.values().collect())
976 }
977
978 pub fn get_connection_by_id(
979 &self,
980 connection_id: ConnectionId,
981 ) -> Option<&Arc<ConnectionCatalog>> {
982 self.connection_by_id.get(&connection_id)
983 }
984
985 pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
986 self.connection_by_name.get(connection_name)
987 }
988
989 pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
990 self.secret_by_name.get(secret_name)
991 }
992
993 pub fn get_secret_by_id(&self, secret_id: SecretId) -> Option<&Arc<SecretCatalog>> {
994 self.secret_by_id.get(&secret_id)
995 }
996
997 pub fn get_source_ids_by_connection(
999 &self,
1000 connection_id: ConnectionId,
1001 ) -> Option<Vec<SourceId>> {
1002 self.connection_source_ref
1003 .get(&connection_id)
1004 .map(|c| c.to_owned())
1005 }
1006
1007 pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
1009 self.connection_sink_ref
1010 .get(&connection_id)
1011 .map(|s| s.to_owned())
1012 }
1013
1014 pub fn get_grant_object_by_oid(&self, oid: ObjectId) -> Option<OwnedGrantObject> {
1015 #[allow(clippy::manual_map)]
1016 if let Some(table) = self.get_created_table_by_id(oid.as_table_id()) {
1017 Some(OwnedGrantObject {
1018 owner: table.owner,
1019 object: Object::TableId(oid.as_raw_id()),
1020 })
1021 } else if let Some(index) = self.get_index_by_id(oid.as_index_id()) {
1022 Some(OwnedGrantObject {
1023 owner: index.owner(),
1024 object: Object::TableId(oid.as_raw_id()),
1025 })
1026 } else if let Some(source) = self.get_source_by_id(oid.as_source_id()) {
1027 Some(OwnedGrantObject {
1028 owner: source.owner,
1029 object: Object::SourceId(oid.as_raw_id()),
1030 })
1031 } else if let Some(sink) = self.get_sink_by_id(oid.as_sink_id()) {
1032 Some(OwnedGrantObject {
1033 owner: sink.owner.user_id,
1034 object: Object::SinkId(oid.as_raw_id()),
1035 })
1036 } else if let Some(view) = self.get_view_by_id(oid.as_view_id()) {
1037 Some(OwnedGrantObject {
1038 owner: view.owner,
1039 object: Object::ViewId(oid.as_raw_id()),
1040 })
1041 } else if let Some(function) = self.get_function_by_id(oid.as_function_id()) {
1042 Some(OwnedGrantObject {
1043 owner: function.owner(),
1044 object: Object::FunctionId(oid.as_raw_id()),
1045 })
1046 } else if let Some(subscription) = self.get_subscription_by_id(oid.as_subscription_id()) {
1047 Some(OwnedGrantObject {
1048 owner: subscription.owner.user_id,
1049 object: Object::SubscriptionId(oid.as_raw_id()),
1050 })
1051 } else if let Some(connection) = self.get_connection_by_id(oid.as_connection_id()) {
1052 Some(OwnedGrantObject {
1053 owner: connection.owner,
1054 object: Object::ConnectionId(oid.as_raw_id()),
1055 })
1056 } else if let Some(secret) = self.get_secret_by_id(oid.as_secret_id()) {
1057 Some(OwnedGrantObject {
1058 owner: secret.owner,
1059 object: Object::SecretId(oid.as_raw_id()),
1060 })
1061 } else {
1062 None
1063 }
1064 }
1065
1066 pub fn contains_object(&self, oid: ObjectId) -> bool {
1067 self.table_by_id.contains_key(&oid.as_table_id())
1068 || self.index_by_id.contains_key(&oid.as_index_id())
1069 || self.source_by_id.contains_key(&oid.as_source_id())
1070 || self.sink_by_id.contains_key(&oid.as_sink_id())
1071 || self.view_by_id.contains_key(&oid.as_view_id())
1072 || self.function_by_id.contains_key(&oid.as_function_id())
1073 || self
1074 .subscription_by_id
1075 .contains_key(&oid.as_subscription_id())
1076 || self.connection_by_id.contains_key(&oid.as_connection_id())
1077 }
1078
1079 pub fn id(&self) -> SchemaId {
1080 self.id
1081 }
1082
1083 pub fn database_id(&self) -> DatabaseId {
1084 self.database_id
1085 }
1086
1087 pub fn name(&self) -> String {
1088 self.name.clone()
1089 }
1090}
1091
1092impl OwnedByUserCatalog for SchemaCatalog {
1093 fn owner(&self) -> UserId {
1094 self.owner
1095 }
1096}
1097
1098impl From<&PbSchema> for SchemaCatalog {
1099 fn from(schema: &PbSchema) -> Self {
1100 Self {
1101 id: schema.id,
1102 owner: schema.owner,
1103 name: schema.name.clone(),
1104 database_id: schema.database_id,
1105 table_by_name: HashMap::new(),
1106 table_by_id: HashMap::new(),
1107 source_by_name: HashMap::new(),
1108 source_by_id: HashMap::new(),
1109 sink_by_name: HashMap::new(),
1110 sink_by_id: HashMap::new(),
1111 table_incoming_sinks: HashMap::new(),
1112 index_by_name: HashMap::new(),
1113 index_by_id: HashMap::new(),
1114 indexes_by_table_id: HashMap::new(),
1115 system_table_by_name: HashMap::new(),
1116 view_by_name: HashMap::new(),
1117 view_by_id: HashMap::new(),
1118 function_registry: FunctionRegistry::default(),
1119 function_by_name: HashMap::new(),
1120 function_by_id: HashMap::new(),
1121 connection_by_name: HashMap::new(),
1122 connection_by_id: HashMap::new(),
1123 secret_by_name: HashMap::new(),
1124 secret_by_id: HashMap::new(),
1125 _secret_source_ref: HashMap::new(),
1126 _secret_sink_ref: HashMap::new(),
1127 connection_source_ref: HashMap::new(),
1128 connection_sink_ref: HashMap::new(),
1129 subscription_by_name: HashMap::new(),
1130 subscription_by_id: HashMap::new(),
1131 }
1132 }
1133}