1use std::collections::hash_map::Entry::{Occupied, Vacant};
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25 PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26 PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, OwnedGrantObject, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47 id: SchemaId,
48 pub name: String,
49 pub database_id: DatabaseId,
50 table_by_name: HashMap<String, Arc<TableCatalog>>,
52 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54 source_by_name: HashMap<String, Arc<SourceCatalog>>,
55 source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56 sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57 sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58 table_incoming_sinks: HashMap<TableId, HashSet<SinkId>>,
60 subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
61 subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
62 index_by_name: HashMap<String, Arc<IndexCatalog>>,
63 index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
64 indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
65 view_by_name: HashMap<String, Arc<ViewCatalog>>,
66 view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
67 function_registry: FunctionRegistry,
68 function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
69 function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
70 connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
71 connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
72 secret_by_name: HashMap<String, Arc<SecretCatalog>>,
73 secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
74
75 _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
76 _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
77
78 connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
80 connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
82 system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
84 pub owner: u32,
85}
86
87impl SchemaCatalog {
88 pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
89 let name = prost.name.clone();
90 let id = prost.id.into();
91 let table: TableCatalog = prost.into();
92 let table_ref = Arc::new(table);
93
94 self.table_by_name
95 .try_insert(name, table_ref.clone())
96 .unwrap();
97 self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
98 table_ref
99 }
100
101 pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
102 self.system_table_by_name
103 .try_insert(sys_table.name.clone(), sys_table)
104 .unwrap();
105 }
106
107 pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
108 self.view_by_name
109 .try_insert(sys_view.name().to_owned(), sys_view.clone())
110 .unwrap();
111 self.view_by_id
112 .try_insert(sys_view.id, sys_view.clone())
113 .unwrap();
114 }
115
116 pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
117 let name = prost.name.clone();
118 let id = prost.id.into();
119 let table: TableCatalog = prost.into();
120 let table_ref = Arc::new(table);
121
122 let old_table = self.table_by_id.get(&id).unwrap();
123 if old_table.name() != name
125 && let Some(t) = self.table_by_name.get(old_table.name())
126 && t.id == id
127 {
128 self.table_by_name.remove(old_table.name());
129 }
130
131 self.table_by_name.insert(name, table_ref.clone());
132 self.table_by_id.insert(id, table_ref.clone());
133 table_ref
134 }
135
136 pub fn update_index(&mut self, prost: &PbIndex) {
137 let name = prost.name.clone();
138 let id = prost.id.into();
139 let old_index = self.index_by_id.get(&id).unwrap();
140 let index_table = self
141 .get_created_table_by_id(&prost.index_table_id.into())
142 .unwrap();
143 let primary_table = self
144 .get_created_table_by_id(&prost.primary_table_id.into())
145 .unwrap();
146 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
147 let index_ref = Arc::new(index);
148
149 if old_index.name != name
151 && let Some(idx) = self.index_by_name.get(&old_index.name)
152 && idx.id == id
153 {
154 self.index_by_name.remove(&old_index.name);
155 }
156 self.index_by_name.insert(name, index_ref.clone());
157 self.index_by_id.insert(id, index_ref.clone());
158
159 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
160 Occupied(mut entry) => {
161 let pos = entry
162 .get()
163 .iter()
164 .position(|x| x.id == index_ref.id)
165 .unwrap();
166 *entry.get_mut().get_mut(pos).unwrap() = index_ref;
167 }
168 Vacant(_entry) => {
169 unreachable!()
170 }
171 };
172 }
173
174 pub fn drop_table(&mut self, id: TableId) {
175 if let Some(table_ref) = self.table_by_id.remove(&id) {
176 self.table_by_name.remove(&table_ref.name).unwrap();
177 self.indexes_by_table_id.remove(&table_ref.id);
178 } else {
179 tracing::warn!(
180 id = ?id.table_id,
181 "table not found when dropping, frontend might not be notified yet"
182 );
183 }
184 }
185
186 pub fn create_index(&mut self, prost: &PbIndex) {
187 let name = prost.name.clone();
188 let id = prost.id.into();
189 let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap();
190 let primary_table = self
191 .get_created_table_by_id(&prost.primary_table_id.into())
192 .unwrap();
193 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
194 let index_ref = Arc::new(index);
195
196 self.index_by_name
197 .try_insert(name, index_ref.clone())
198 .unwrap();
199 self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
200 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
201 Occupied(mut entry) => {
202 entry.get_mut().push(index_ref);
203 }
204 Vacant(entry) => {
205 entry.insert(vec![index_ref]);
206 }
207 };
208 }
209
210 pub fn drop_index(&mut self, id: IndexId) {
211 let index_ref = self.index_by_id.remove(&id).unwrap();
212 self.index_by_name.remove(&index_ref.name).unwrap();
213 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
214 Occupied(mut entry) => {
215 let pos = entry
216 .get_mut()
217 .iter()
218 .position(|x| x.id == index_ref.id)
219 .unwrap();
220 entry.get_mut().remove(pos);
221 }
222 Vacant(_entry) => (),
223 };
224 }
225
226 pub fn create_source(&mut self, prost: &PbSource) {
227 let name = prost.name.clone();
228 let id = prost.id;
229 let source = SourceCatalog::from(prost);
230 let source_ref = Arc::new(source);
231
232 if let Some(connection_id) = source_ref.connection_id {
233 self.connection_source_ref
234 .entry(connection_id)
235 .and_modify(|sources| sources.push(source_ref.id))
236 .or_insert(vec![source_ref.id]);
237 }
238
239 self.source_by_name
240 .try_insert(name, source_ref.clone())
241 .unwrap();
242 self.source_by_id.try_insert(id, source_ref).unwrap();
243 }
244
245 pub fn drop_source(&mut self, id: SourceId) {
246 let source_ref = self.source_by_id.remove(&id).unwrap();
247 self.source_by_name.remove(&source_ref.name).unwrap();
248 if let Some(connection_id) = source_ref.connection_id
249 && let Occupied(mut e) = self.connection_source_ref.entry(connection_id)
250 {
251 let source_ids = e.get_mut();
252 source_ids.retain_mut(|sid| *sid != id);
253 if source_ids.is_empty() {
254 e.remove_entry();
255 }
256 }
257 }
258
259 pub fn update_source(&mut self, prost: &PbSource) {
260 let name = prost.name.clone();
261 let id = prost.id;
262 let source = SourceCatalog::from(prost);
263 let source_ref = Arc::new(source);
264
265 let old_source = self.source_by_id.get(&id).unwrap();
266 if old_source.name != name
268 && let Some(src) = self.source_by_name.get(&old_source.name)
269 && src.id == id
270 {
271 self.source_by_name.remove(&old_source.name);
272 }
273
274 self.source_by_name.insert(name, source_ref.clone());
275 self.source_by_id.insert(id, source_ref);
276 }
277
278 pub fn create_sink(&mut self, prost: &PbSink) {
279 let name = prost.name.clone();
280 let id = prost.id;
281 let sink = SinkCatalog::from(prost);
282 let sink_ref = Arc::new(sink);
283
284 if let Some(connection_id) = sink_ref.connection_id {
285 self.connection_sink_ref
286 .entry(connection_id.0)
287 .and_modify(|sinks| sinks.push(id))
288 .or_insert(vec![id]);
289 }
290
291 if let Some(target_table) = sink_ref.target_table {
292 assert!(
293 self.table_incoming_sinks
294 .entry(target_table)
295 .or_default()
296 .insert(sink_ref.id.sink_id)
297 );
298 }
299
300 self.sink_by_name
301 .try_insert(name, sink_ref.clone())
302 .unwrap();
303 self.sink_by_id.try_insert(id, sink_ref).unwrap();
304 }
305
306 pub fn drop_sink(&mut self, id: SinkId) {
307 if let Some(sink_ref) = self.sink_by_id.remove(&id) {
308 self.sink_by_name.remove(&sink_ref.name).unwrap();
309 if let Some(connection_id) = sink_ref.connection_id
310 && let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0)
311 {
312 let sink_ids = e.get_mut();
313 sink_ids.retain_mut(|sid| *sid != id);
314 if sink_ids.is_empty() {
315 e.remove_entry();
316 }
317 }
318 if let Some(target_table) = sink_ref.target_table {
319 let incoming_sinks = self
320 .table_incoming_sinks
321 .get_mut(&target_table)
322 .expect("should exists");
323 assert!(incoming_sinks.remove(&sink_ref.id.sink_id));
324 if incoming_sinks.is_empty() {
325 self.table_incoming_sinks.remove(&target_table);
326 }
327 }
328 } else {
329 tracing::warn!(
330 id,
331 "sink not found when dropping, frontend might not be notified yet"
332 );
333 }
334 }
335
336 pub fn update_sink(&mut self, prost: &PbSink) {
337 let name = prost.name.clone();
338 let id = prost.id;
339 let sink = SinkCatalog::from(prost);
340 let sink_ref = Arc::new(sink);
341
342 let old_sink = self.sink_by_id.get(&id).unwrap();
343 assert_eq!(sink_ref.target_table, old_sink.target_table);
344 if old_sink.name != name
346 && let Some(s) = self.sink_by_name.get(&old_sink.name)
347 && s.id.sink_id == id
348 {
349 self.sink_by_name.remove(&old_sink.name);
350 }
351
352 self.sink_by_name.insert(name, sink_ref.clone());
353 self.sink_by_id.insert(id, sink_ref);
354 }
355
356 pub fn table_incoming_sinks(&self, table_id: TableId) -> Option<&HashSet<SinkId>> {
357 self.table_incoming_sinks.get(&table_id)
358 }
359
360 pub fn create_subscription(&mut self, prost: &PbSubscription) {
361 let name = prost.name.clone();
362 let id = prost.id;
363 let subscription_catalog = SubscriptionCatalog::from(prost);
364 let subscription_ref = Arc::new(subscription_catalog);
365
366 self.subscription_by_name
367 .try_insert(name, subscription_ref.clone())
368 .unwrap();
369 self.subscription_by_id
370 .try_insert(id, subscription_ref)
371 .unwrap();
372 }
373
374 pub fn drop_subscription(&mut self, id: SubscriptionId) {
375 let subscription_ref = self.subscription_by_id.remove(&id);
376 if let Some(subscription_ref) = subscription_ref {
377 self.subscription_by_name.remove(&subscription_ref.name);
378 }
379 }
380
381 pub fn update_subscription(&mut self, prost: &PbSubscription) {
382 let name = prost.name.clone();
383 let id = prost.id;
384 let subscription = SubscriptionCatalog::from(prost);
385 let subscription_ref = Arc::new(subscription);
386
387 let old_subscription = self.subscription_by_id.get(&id).unwrap();
388 if old_subscription.name != name
390 && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
391 && s.id.subscription_id == id
392 {
393 self.subscription_by_name.remove(&old_subscription.name);
394 }
395
396 self.subscription_by_name
397 .insert(name, subscription_ref.clone());
398 self.subscription_by_id.insert(id, subscription_ref);
399 }
400
401 pub fn create_view(&mut self, prost: &PbView) {
402 let name = prost.name.clone();
403 let id = prost.id;
404 let view = ViewCatalog::from(prost);
405 let view_ref = Arc::new(view);
406
407 self.view_by_name
408 .try_insert(name, view_ref.clone())
409 .unwrap();
410 self.view_by_id.try_insert(id, view_ref).unwrap();
411 }
412
413 pub fn drop_view(&mut self, id: ViewId) {
414 let view_ref = self.view_by_id.remove(&id).unwrap();
415 self.view_by_name.remove(&view_ref.name).unwrap();
416 }
417
418 pub fn update_view(&mut self, prost: &PbView) {
419 let name = prost.name.clone();
420 let id = prost.id;
421 let view = ViewCatalog::from(prost);
422 let view_ref = Arc::new(view);
423
424 let old_view = self.view_by_id.get(&id).unwrap();
425 if old_view.name != name
427 && let Some(v) = self.view_by_name.get(old_view.name())
428 && v.id == id
429 {
430 self.view_by_name.remove(&old_view.name);
431 }
432
433 self.view_by_name.insert(name, view_ref.clone());
434 self.view_by_id.insert(id, view_ref);
435 }
436
437 pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
438 FuncSign {
439 name: FuncName::Udf(func.name.clone()),
440 inputs_type: func
441 .arg_types
442 .iter()
443 .map(|t| t.clone().into())
444 .collect_vec(),
445 variadic: false,
446 ret_type: func.return_type.clone().into(),
447 build: FuncBuilder::Udf,
448 type_infer: |_| Ok(DataType::Boolean),
450 deprecated: false,
451 }
452 }
453
454 pub fn create_function(&mut self, prost: &PbFunction) {
455 let name = prost.name.clone();
456 let id = prost.id;
457 let function = FunctionCatalog::from(prost);
458 let args = function.arg_types.clone();
459 let function_ref = Arc::new(function);
460
461 self.function_registry
462 .insert(Self::get_func_sign(&function_ref));
463 self.function_by_name
464 .entry(name)
465 .or_default()
466 .try_insert(args, function_ref.clone())
467 .expect("function already exists with same argument types");
468 self.function_by_id
469 .try_insert(id.into(), function_ref)
470 .expect("function id exists");
471 }
472
473 pub fn drop_function(&mut self, id: FunctionId) {
474 let function_ref = self
475 .function_by_id
476 .remove(&id)
477 .expect("function not found by id");
478
479 self.function_registry
480 .remove(Self::get_func_sign(&function_ref))
481 .expect("function not found in registry");
482
483 self.function_by_name
484 .get_mut(&function_ref.name)
485 .expect("function not found by name")
486 .remove(&function_ref.arg_types)
487 .expect("function not found by argument types");
488 }
489
490 pub fn update_function(&mut self, prost: &PbFunction) {
491 let name = prost.name.clone();
492 let id = prost.id.into();
493 let function = FunctionCatalog::from(prost);
494 let function_ref = Arc::new(function);
495
496 let old_function_by_id = self.function_by_id.get(&id).unwrap();
497 let old_function_by_name = self
498 .function_by_name
499 .get_mut(&old_function_by_id.name)
500 .unwrap();
501 if old_function_by_id.name != name
503 && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
504 && f.id == id
505 {
506 old_function_by_name.remove(&old_function_by_id.arg_types);
507 if old_function_by_name.is_empty() {
508 self.function_by_name.remove(&old_function_by_id.name);
509 }
510 }
511
512 self.function_by_name
513 .entry(name)
514 .or_default()
515 .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
516 self.function_by_id.insert(id, function_ref);
517 }
518
519 pub fn create_connection(&mut self, prost: &PbConnection) {
520 let name = prost.name.clone();
521 let id = prost.id;
522 let connection = ConnectionCatalog::from(prost);
523 let connection_ref = Arc::new(connection);
524 self.connection_by_name
525 .try_insert(name, connection_ref.clone())
526 .unwrap();
527 self.connection_by_id
528 .try_insert(id, connection_ref)
529 .unwrap();
530 }
531
532 pub fn update_connection(&mut self, prost: &PbConnection) {
533 let name = prost.name.clone();
534 let id = prost.id;
535 let connection = ConnectionCatalog::from(prost);
536 let connection_ref = Arc::new(connection);
537
538 let old_connection = self.connection_by_id.get(&id).unwrap();
539 if old_connection.name != name
541 && let Some(conn) = self.connection_by_name.get(&old_connection.name)
542 && conn.id == id
543 {
544 self.connection_by_name.remove(&old_connection.name);
545 }
546
547 self.connection_by_name.insert(name, connection_ref.clone());
548 self.connection_by_id.insert(id, connection_ref);
549 }
550
551 pub fn drop_connection(&mut self, connection_id: ConnectionId) {
552 let connection_ref = self
553 .connection_by_id
554 .remove(&connection_id)
555 .expect("connection not found by id");
556 self.connection_by_name
557 .remove(&connection_ref.name)
558 .expect("connection not found by name");
559 }
560
561 pub fn create_secret(&mut self, prost: &PbSecret) {
562 let name = prost.name.clone();
563 let id = SecretId::new(prost.id);
564 let secret = SecretCatalog::from(prost);
565 let secret_ref = Arc::new(secret);
566
567 self.secret_by_id
568 .try_insert(id, secret_ref.clone())
569 .unwrap();
570 self.secret_by_name.try_insert(name, secret_ref).unwrap();
571 }
572
573 pub fn update_secret(&mut self, prost: &PbSecret) {
574 let name = prost.name.clone();
575 let id = SecretId::new(prost.id);
576 let secret = SecretCatalog::from(prost);
577 let secret_ref = Arc::new(secret);
578
579 let old_secret = self.secret_by_id.get(&id).unwrap();
580 if old_secret.name != name
582 && let Some(s) = self.secret_by_name.get(&old_secret.name)
583 && s.id == id
584 {
585 self.secret_by_name.remove(&old_secret.name);
586 }
587
588 self.secret_by_name.insert(name, secret_ref.clone());
589 self.secret_by_id.insert(id, secret_ref);
590 }
591
592 pub fn drop_secret(&mut self, secret_id: SecretId) {
593 let secret_ref = self
594 .secret_by_id
595 .remove(&secret_id)
596 .expect("secret not found by id");
597 self.secret_by_name
598 .remove(&secret_ref.name)
599 .expect("secret not found by name");
600 }
601
602 pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
603 self.table_by_name.values()
604 }
605
606 pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
607 self.table_by_name.values().filter(|v| v.is_user_table())
608 }
609
610 pub fn iter_user_table_with_acl<'a>(
611 &'a self,
612 user: &'a UserCatalog,
613 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
614 self.table_by_name
615 .values()
616 .filter(|v| v.is_user_table() && has_access_to_object(user, v.id.table_id, v.owner))
617 }
618
619 pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
620 self.table_by_name
621 .values()
622 .filter(|v| v.is_internal_table())
623 }
624
625 pub fn iter_internal_table_with_acl<'a>(
626 &'a self,
627 user: &'a UserCatalog,
628 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
629 self.table_by_name
630 .values()
631 .filter(|v| v.is_internal_table() && has_access_to_object(user, v.id.table_id, v.owner))
632 }
633
634 pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
636 self.table_by_name
637 .values()
638 .filter(|v| !v.is_internal_table())
639 }
640
641 pub fn iter_table_mv_indices_with_acl<'a>(
642 &'a self,
643 user: &'a UserCatalog,
644 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
645 self.table_by_name.values().filter(|v| {
646 !v.is_internal_table() && has_access_to_object(user, v.id.table_id, v.owner)
647 })
648 }
649
650 pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
652 self.table_by_name.values().filter(|v| v.is_mview())
653 }
654
655 pub fn iter_all_mvs_with_acl<'a>(
656 &'a self,
657 user: &'a UserCatalog,
658 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
659 self.table_by_name
660 .values()
661 .filter(|v| v.is_mview() && has_access_to_object(user, v.id.table_id, v.owner))
662 }
663
664 pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
666 self.table_by_name
667 .values()
668 .filter(|v| v.is_mview() && v.is_created())
669 }
670
671 pub fn iter_created_mvs_with_acl<'a>(
672 &'a self,
673 user: &'a UserCatalog,
674 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
675 self.table_by_name.values().filter(|v| {
676 v.is_mview() && v.is_created() && has_access_to_object(user, v.id.table_id, v.owner)
677 })
678 }
679
680 pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
682 self.index_by_name.values()
683 }
684
685 pub fn iter_index_with_acl<'a>(
686 &'a self,
687 user: &'a UserCatalog,
688 ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
689 self.index_by_name
690 .values()
691 .filter(|idx| has_access_to_object(user, idx.id.index_id, idx.owner()))
692 }
693
694 pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
696 self.source_by_name.values()
697 }
698
699 pub fn iter_source_with_acl<'a>(
700 &'a self,
701 user: &'a UserCatalog,
702 ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
703 self.source_by_name
704 .values()
705 .filter(|s| has_access_to_object(user, s.id, s.owner))
706 }
707
708 pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
709 self.sink_by_name.values()
710 }
711
712 pub fn iter_sink_with_acl<'a>(
713 &'a self,
714 user: &'a UserCatalog,
715 ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
716 self.sink_by_name
717 .values()
718 .filter(|s| has_access_to_object(user, s.id.sink_id, s.owner.user_id))
719 }
720
721 pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
722 self.subscription_by_name.values()
723 }
724
725 pub fn iter_subscription_with_acl<'a>(
726 &'a self,
727 user: &'a UserCatalog,
728 ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
729 self.subscription_by_name
730 .values()
731 .filter(|s| has_access_to_object(user, s.id.subscription_id, s.owner.user_id))
732 }
733
734 pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
735 self.view_by_name.values()
736 }
737
738 pub fn iter_view_with_acl<'a>(
739 &'a self,
740 user: &'a UserCatalog,
741 ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
742 self.view_by_name
743 .values()
744 .filter(|v| v.is_system_view() || has_access_to_object(user, v.id, v.owner))
745 }
746
747 pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
748 self.function_by_name.values().flat_map(|v| v.values())
749 }
750
751 pub fn iter_function_with_acl<'a>(
752 &'a self,
753 user: &'a UserCatalog,
754 ) -> impl Iterator<Item = &'a Arc<FunctionCatalog>> {
755 self.function_by_name
756 .values()
757 .flat_map(|v| v.values())
758 .filter(|f| has_access_to_object(user, f.id.function_id(), f.owner))
759 }
760
761 pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
762 self.connection_by_name.values()
763 }
764
765 pub fn iter_connections_with_acl<'a>(
766 &'a self,
767 user: &'a UserCatalog,
768 ) -> impl Iterator<Item = &'a Arc<ConnectionCatalog>> {
769 self.connection_by_name
770 .values()
771 .filter(|c| has_access_to_object(user, c.id, c.owner))
772 }
773
774 pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
775 self.secret_by_name.values()
776 }
777
778 pub fn iter_secret_with_acl<'a>(
779 &'a self,
780 user: &'a UserCatalog,
781 ) -> impl Iterator<Item = &'a Arc<SecretCatalog>> {
782 self.secret_by_name
783 .values()
784 .filter(|s| has_access_to_object(user, s.id.secret_id(), s.owner))
785 }
786
787 pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
788 self.system_table_by_name.values()
789 }
790
791 pub fn get_table_by_name(
792 &self,
793 table_name: &str,
794 bind_creating_relations: bool,
795 ) -> Option<&Arc<TableCatalog>> {
796 self.table_by_name
797 .get(table_name)
798 .filter(|&table| bind_creating_relations || table.is_created())
799 }
800
801 pub fn get_any_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
802 self.get_table_by_name(table_name, true)
803 }
804
805 pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
806 self.get_table_by_name(table_name, false)
807 }
808
809 pub fn get_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
810 self.table_by_id.get(table_id)
811 }
812
813 pub fn get_created_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
814 self.table_by_id
815 .get(table_id)
816 .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
817 }
818
819 pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
820 self.view_by_name.get(view_name)
821 }
822
823 pub fn get_view_by_id(&self, view_id: &ViewId) -> Option<&Arc<ViewCatalog>> {
824 self.view_by_id.get(view_id)
825 }
826
827 pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
828 self.source_by_name.get(source_name)
829 }
830
831 pub fn get_source_by_id(&self, source_id: &SourceId) -> Option<&Arc<SourceCatalog>> {
832 self.source_by_id.get(source_id)
833 }
834
835 pub fn get_sink_by_name(
836 &self,
837 sink_name: &str,
838 bind_creating: bool,
839 ) -> Option<&Arc<SinkCatalog>> {
840 self.sink_by_name
841 .get(sink_name)
842 .filter(|s| bind_creating || s.is_created())
843 }
844
845 pub fn get_any_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
846 self.get_sink_by_name(sink_name, true)
847 }
848
849 pub fn get_created_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
850 self.get_sink_by_name(sink_name, false)
851 }
852
853 pub fn get_sink_by_id(&self, sink_id: &SinkId) -> Option<&Arc<SinkCatalog>> {
854 self.sink_by_id.get(sink_id)
855 }
856
857 pub fn get_subscription_by_name(
858 &self,
859 subscription_name: &str,
860 ) -> Option<&Arc<SubscriptionCatalog>> {
861 self.subscription_by_name.get(subscription_name)
862 }
863
864 pub fn get_subscription_by_id(
865 &self,
866 subscription_id: &SubscriptionId,
867 ) -> Option<&Arc<SubscriptionCatalog>> {
868 self.subscription_by_id.get(subscription_id)
869 }
870
871 pub fn get_index_by_name(
872 &self,
873 index_name: &str,
874 bind_creating: bool,
875 ) -> Option<&Arc<IndexCatalog>> {
876 self.index_by_name
877 .get(index_name)
878 .filter(|i| bind_creating || i.is_created())
879 }
880
881 pub fn get_any_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
882 self.get_index_by_name(index_name, true)
883 }
884
885 pub fn get_created_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
886 self.get_index_by_name(index_name, false)
887 }
888
889 pub fn get_index_by_id(&self, index_id: &IndexId) -> Option<&Arc<IndexCatalog>> {
890 self.index_by_id.get(index_id)
891 }
892
893 pub fn get_indexes_by_table_id(
894 &self,
895 table_id: &TableId,
896 include_creating: bool,
897 ) -> Vec<Arc<IndexCatalog>> {
898 self.indexes_by_table_id
899 .get(table_id)
900 .cloned()
901 .unwrap_or_default()
902 .into_iter()
903 .filter(|i| include_creating || i.is_created())
904 .collect()
905 }
906
907 pub fn get_any_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
908 self.get_indexes_by_table_id(table_id, true)
909 }
910
911 pub fn get_created_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
912 self.get_indexes_by_table_id(table_id, false)
913 }
914
915 pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
916 self.system_table_by_name.get(table_name)
917 }
918
919 pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
920 self.table_by_id
921 .get(&table_id)
922 .map(|table| table.name.clone())
923 }
924
925 pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
926 self.function_by_id.get(&function_id)
927 }
928
929 pub fn get_function_by_name_inputs(
930 &self,
931 name: &str,
932 inputs: &mut [ExprImpl],
933 ) -> Option<&Arc<FunctionCatalog>> {
934 infer_type_with_sigmap(
935 FuncName::Udf(name.to_owned()),
936 inputs,
937 &self.function_registry,
938 )
939 .ok()?;
940 let args = inputs.iter().map(|x| x.return_type()).collect_vec();
941 self.function_by_name.get(name)?.get(&args)
942 }
943
944 pub fn get_function_by_name_args(
945 &self,
946 name: &str,
947 args: &[DataType],
948 ) -> Option<&Arc<FunctionCatalog>> {
949 let args = args.iter().map(|x| Some(x.clone())).collect_vec();
950 let func = infer_type_name(
951 &self.function_registry,
952 FuncName::Udf(name.to_owned()),
953 &args,
954 )
955 .ok()?;
956
957 let args = func
958 .inputs_type
959 .iter()
960 .filter_map(|x| {
961 if let SigDataType::Exact(t) = x {
962 Some(t.clone())
963 } else {
964 None
965 }
966 })
967 .collect_vec();
968
969 self.function_by_name.get(name)?.get(&args)
970 }
971
972 pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
973 let functions = self.function_by_name.get(name)?;
974 if functions.is_empty() {
975 return None;
976 }
977 Some(functions.values().collect())
978 }
979
980 pub fn get_connection_by_id(
981 &self,
982 connection_id: &ConnectionId,
983 ) -> Option<&Arc<ConnectionCatalog>> {
984 self.connection_by_id.get(connection_id)
985 }
986
987 pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
988 self.connection_by_name.get(connection_name)
989 }
990
991 pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
992 self.secret_by_name.get(secret_name)
993 }
994
995 pub fn get_secret_by_id(&self, secret_id: &SecretId) -> Option<&Arc<SecretCatalog>> {
996 self.secret_by_id.get(secret_id)
997 }
998
999 pub fn get_source_ids_by_connection(
1001 &self,
1002 connection_id: ConnectionId,
1003 ) -> Option<Vec<SourceId>> {
1004 self.connection_source_ref
1005 .get(&connection_id)
1006 .map(|c| c.to_owned())
1007 }
1008
1009 pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
1011 self.connection_sink_ref
1012 .get(&connection_id)
1013 .map(|s| s.to_owned())
1014 }
1015
1016 pub fn get_grant_object_by_oid(&self, oid: u32) -> Option<OwnedGrantObject> {
1017 #[allow(clippy::manual_map)]
1018 if let Some(table) = self.get_created_table_by_id(&TableId::new(oid)) {
1019 Some(OwnedGrantObject {
1020 owner: table.owner,
1021 object: Object::TableId(oid),
1022 })
1023 } else if let Some(index) = self.get_index_by_id(&IndexId::new(oid)) {
1024 Some(OwnedGrantObject {
1025 owner: index.owner(),
1026 object: Object::TableId(oid),
1027 })
1028 } else if let Some(source) = self.get_source_by_id(&oid) {
1029 Some(OwnedGrantObject {
1030 owner: source.owner,
1031 object: Object::SourceId(oid),
1032 })
1033 } else if let Some(sink) = self.get_sink_by_id(&oid) {
1034 Some(OwnedGrantObject {
1035 owner: sink.owner.user_id,
1036 object: Object::SinkId(oid),
1037 })
1038 } else if let Some(view) = self.get_view_by_id(&oid) {
1039 Some(OwnedGrantObject {
1040 owner: view.owner,
1041 object: Object::ViewId(oid),
1042 })
1043 } else if let Some(function) = self.get_function_by_id(FunctionId::new(oid)) {
1044 Some(OwnedGrantObject {
1045 owner: function.owner(),
1046 object: Object::FunctionId(oid),
1047 })
1048 } else if let Some(subscription) = self.get_subscription_by_id(&oid) {
1049 Some(OwnedGrantObject {
1050 owner: subscription.owner.user_id,
1051 object: Object::SubscriptionId(oid),
1052 })
1053 } else if let Some(connection) = self.get_connection_by_id(&oid) {
1054 Some(OwnedGrantObject {
1055 owner: connection.owner,
1056 object: Object::ConnectionId(oid),
1057 })
1058 } else if let Some(secret) = self.get_secret_by_id(&SecretId::new(oid)) {
1059 Some(OwnedGrantObject {
1060 owner: secret.owner,
1061 object: Object::SecretId(oid),
1062 })
1063 } else {
1064 None
1065 }
1066 }
1067
1068 pub fn contains_object(&self, oid: u32) -> bool {
1069 self.table_by_id.contains_key(&TableId::new(oid))
1070 || self.index_by_id.contains_key(&IndexId::new(oid))
1071 || self.source_by_id.contains_key(&oid)
1072 || self.sink_by_id.contains_key(&oid)
1073 || self.view_by_id.contains_key(&oid)
1074 || self.function_by_id.contains_key(&FunctionId::new(oid))
1075 || self.subscription_by_id.contains_key(&oid)
1076 || self.connection_by_id.contains_key(&oid)
1077 }
1078
1079 pub fn id(&self) -> SchemaId {
1080 self.id
1081 }
1082
1083 pub fn database_id(&self) -> DatabaseId {
1084 self.database_id
1085 }
1086
1087 pub fn name(&self) -> String {
1088 self.name.clone()
1089 }
1090}
1091
1092impl OwnedByUserCatalog for SchemaCatalog {
1093 fn owner(&self) -> UserId {
1094 self.owner
1095 }
1096}
1097
1098impl From<&PbSchema> for SchemaCatalog {
1099 fn from(schema: &PbSchema) -> Self {
1100 Self {
1101 id: schema.id,
1102 owner: schema.owner,
1103 name: schema.name.clone(),
1104 database_id: schema.database_id,
1105 table_by_name: HashMap::new(),
1106 table_by_id: HashMap::new(),
1107 source_by_name: HashMap::new(),
1108 source_by_id: HashMap::new(),
1109 sink_by_name: HashMap::new(),
1110 sink_by_id: HashMap::new(),
1111 table_incoming_sinks: HashMap::new(),
1112 index_by_name: HashMap::new(),
1113 index_by_id: HashMap::new(),
1114 indexes_by_table_id: HashMap::new(),
1115 system_table_by_name: HashMap::new(),
1116 view_by_name: HashMap::new(),
1117 view_by_id: HashMap::new(),
1118 function_registry: FunctionRegistry::default(),
1119 function_by_name: HashMap::new(),
1120 function_by_id: HashMap::new(),
1121 connection_by_name: HashMap::new(),
1122 connection_by_id: HashMap::new(),
1123 secret_by_name: HashMap::new(),
1124 secret_by_id: HashMap::new(),
1125 _secret_source_ref: HashMap::new(),
1126 _secret_sink_ref: HashMap::new(),
1127 connection_source_ref: HashMap::new(),
1128 connection_sink_ref: HashMap::new(),
1129 subscription_by_name: HashMap::new(),
1130 subscription_by_id: HashMap::new(),
1131 }
1132 }
1133}