1use std::collections::hash_map::Entry::{Occupied, Vacant};
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25 PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26 PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, OwnedGrantObject, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47 id: SchemaId,
48 pub name: String,
49 pub database_id: DatabaseId,
50 table_by_name: HashMap<String, Arc<TableCatalog>>,
52 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54 source_by_name: HashMap<String, Arc<SourceCatalog>>,
55 source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56 sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57 sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58 table_incoming_sinks: HashMap<TableId, HashSet<SinkId>>,
60 subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
61 subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
62 index_by_name: HashMap<String, Arc<IndexCatalog>>,
63 index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
64 indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
65 view_by_name: HashMap<String, Arc<ViewCatalog>>,
66 view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
67 function_registry: FunctionRegistry,
68 function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
69 function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
70 connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
71 connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
72 secret_by_name: HashMap<String, Arc<SecretCatalog>>,
73 secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
74
75 _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
76 _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
77
78 connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
80 connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
82 system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
84 pub owner: UserId,
85}
86
87impl SchemaCatalog {
88 pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
89 let name = prost.name.clone();
90 let id = prost.id;
91 let table: TableCatalog = prost.into();
92 let table_ref = Arc::new(table);
93
94 self.table_by_name
95 .try_insert(name, table_ref.clone())
96 .unwrap();
97 self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
98 table_ref
99 }
100
101 pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
102 self.system_table_by_name
103 .try_insert(sys_table.name.clone(), sys_table)
104 .unwrap();
105 }
106
107 pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
108 self.view_by_name
109 .try_insert(sys_view.name().to_owned(), sys_view.clone())
110 .unwrap();
111 self.view_by_id
112 .try_insert(sys_view.id, sys_view.clone())
113 .unwrap();
114 }
115
116 pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
117 let name = prost.name.clone();
118 let id = prost.id;
119 let table: TableCatalog = prost.into();
120 let table_ref = Arc::new(table);
121
122 let old_table = self.table_by_id.get(&id).unwrap();
123 if old_table.name() != name
125 && let Some(t) = self.table_by_name.get(old_table.name())
126 && t.id == id
127 {
128 self.table_by_name.remove(old_table.name());
129 }
130
131 self.table_by_name.insert(name, table_ref.clone());
132 self.table_by_id.insert(id, table_ref.clone());
133 table_ref
134 }
135
136 pub fn update_index(&mut self, prost: &PbIndex) {
137 let name = prost.name.clone();
138 let id = prost.id;
139 let old_index = self.index_by_id.get(&id).unwrap();
140 let index_table = self.get_created_table_by_id(prost.index_table_id).unwrap();
141 let primary_table = self
142 .get_created_table_by_id(prost.primary_table_id)
143 .unwrap();
144 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
145 let index_ref = Arc::new(index);
146
147 if old_index.name != name
149 && let Some(idx) = self.index_by_name.get(&old_index.name)
150 && idx.id == id
151 {
152 self.index_by_name.remove(&old_index.name);
153 }
154 self.index_by_name.insert(name, index_ref.clone());
155 self.index_by_id.insert(id, index_ref.clone());
156
157 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
158 Occupied(mut entry) => {
159 let pos = entry
160 .get()
161 .iter()
162 .position(|x| x.id == index_ref.id)
163 .unwrap();
164 *entry.get_mut().get_mut(pos).unwrap() = index_ref;
165 }
166 Vacant(_entry) => {
167 unreachable!()
168 }
169 };
170 }
171
172 pub fn drop_table(&mut self, id: TableId) {
173 if let Some(table_ref) = self.table_by_id.remove(&id) {
174 self.table_by_name.remove(&table_ref.name).unwrap();
175 self.indexes_by_table_id.remove(&table_ref.id);
176 } else {
177 tracing::warn!(
178 %id,
179 "table not found when dropping, frontend might not be notified yet"
180 );
181 }
182 }
183
184 pub fn create_index(&mut self, prost: &PbIndex) {
185 let name = prost.name.clone();
186 let id = prost.id;
187 let index_table = self.get_table_by_id(prost.index_table_id).unwrap();
188 let primary_table = self
189 .get_created_table_by_id(prost.primary_table_id)
190 .unwrap();
191 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
192 let index_ref = Arc::new(index);
193
194 self.index_by_name
195 .try_insert(name, index_ref.clone())
196 .unwrap();
197 self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
198 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
199 Occupied(mut entry) => {
200 entry.get_mut().push(index_ref);
201 }
202 Vacant(entry) => {
203 entry.insert(vec![index_ref]);
204 }
205 };
206 }
207
208 pub fn drop_index(&mut self, id: IndexId) {
209 let index_ref = self.index_by_id.remove(&id).unwrap();
210 self.index_by_name.remove(&index_ref.name).unwrap();
211 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
212 Occupied(mut entry) => {
213 let pos = entry
214 .get_mut()
215 .iter()
216 .position(|x| x.id == index_ref.id)
217 .unwrap();
218 entry.get_mut().remove(pos);
219 }
220 Vacant(_entry) => (),
221 };
222 }
223
224 pub fn create_source(&mut self, prost: &PbSource) {
225 let name = prost.name.clone();
226 let id = prost.id;
227 let source = SourceCatalog::from(prost);
228 let source_ref = Arc::new(source);
229
230 if let Some(connection_id) = source_ref.connection_id {
231 self.connection_source_ref
232 .entry(connection_id)
233 .and_modify(|sources| sources.push(source_ref.id))
234 .or_insert(vec![source_ref.id]);
235 }
236
237 self.source_by_name
238 .try_insert(name, source_ref.clone())
239 .unwrap();
240 self.source_by_id.try_insert(id, source_ref).unwrap();
241 }
242
243 pub fn drop_source(&mut self, id: SourceId) {
244 let source_ref = self.source_by_id.remove(&id).unwrap();
245 self.source_by_name.remove(&source_ref.name).unwrap();
246 if let Some(connection_id) = source_ref.connection_id
247 && let Occupied(mut e) = self.connection_source_ref.entry(connection_id)
248 {
249 let source_ids = e.get_mut();
250 source_ids.retain_mut(|sid| *sid != id);
251 if source_ids.is_empty() {
252 e.remove_entry();
253 }
254 }
255 }
256
257 pub fn update_source(&mut self, prost: &PbSource) {
258 let name = prost.name.clone();
259 let id = prost.id;
260 let source = SourceCatalog::from(prost);
261 let source_ref = Arc::new(source);
262
263 let old_source = self.source_by_id.get(&id).unwrap();
264 if old_source.name != name
266 && let Some(src) = self.source_by_name.get(&old_source.name)
267 && src.id == id
268 {
269 self.source_by_name.remove(&old_source.name);
270 }
271
272 self.source_by_name.insert(name, source_ref.clone());
273 self.source_by_id.insert(id, source_ref);
274 }
275
276 pub fn create_sink(&mut self, prost: &PbSink) {
277 let name = prost.name.clone();
278 let id = prost.id;
279 let sink = SinkCatalog::from(prost);
280 let sink_ref = Arc::new(sink);
281
282 if let Some(connection_id) = sink_ref.connection_id {
283 self.connection_sink_ref
284 .entry(connection_id)
285 .and_modify(|sinks| sinks.push(id))
286 .or_insert(vec![id]);
287 }
288
289 if let Some(target_table) = sink_ref.target_table {
290 assert!(
291 self.table_incoming_sinks
292 .entry(target_table)
293 .or_default()
294 .insert(sink_ref.id)
295 );
296 }
297
298 self.sink_by_name
299 .try_insert(name, sink_ref.clone())
300 .unwrap();
301 self.sink_by_id.try_insert(id, sink_ref).unwrap();
302 }
303
304 pub fn drop_sink(&mut self, id: SinkId) {
305 if let Some(sink_ref) = self.sink_by_id.remove(&id) {
306 self.sink_by_name.remove(&sink_ref.name).unwrap();
307 if let Some(connection_id) = sink_ref.connection_id
308 && let Occupied(mut e) = self.connection_sink_ref.entry(connection_id)
309 {
310 let sink_ids = e.get_mut();
311 sink_ids.retain_mut(|sid| *sid != id);
312 if sink_ids.is_empty() {
313 e.remove_entry();
314 }
315 }
316 if let Some(target_table) = sink_ref.target_table {
317 let incoming_sinks = self
318 .table_incoming_sinks
319 .get_mut(&target_table)
320 .expect("should exists");
321 assert!(incoming_sinks.remove(&sink_ref.id));
322 if incoming_sinks.is_empty() {
323 self.table_incoming_sinks.remove(&target_table);
324 }
325 }
326 } else {
327 tracing::warn!(
328 %id,
329 "sink not found when dropping, frontend might not be notified yet"
330 );
331 }
332 }
333
334 pub fn update_sink(&mut self, prost: &PbSink) {
335 let name = prost.name.clone();
336 let id = prost.id;
337 let sink = SinkCatalog::from(prost);
338 let sink_ref = Arc::new(sink);
339
340 let old_sink = self.sink_by_id.get(&id).unwrap();
341 assert_eq!(sink_ref.target_table, old_sink.target_table);
342 if old_sink.name != name
344 && let Some(s) = self.sink_by_name.get(&old_sink.name)
345 && s.id == id
346 {
347 self.sink_by_name.remove(&old_sink.name);
348 }
349
350 self.sink_by_name.insert(name, sink_ref.clone());
351 self.sink_by_id.insert(id, sink_ref);
352 }
353
354 pub fn table_incoming_sinks(&self, table_id: TableId) -> Option<&HashSet<SinkId>> {
355 self.table_incoming_sinks.get(&table_id)
356 }
357
358 pub fn create_subscription(&mut self, prost: &PbSubscription) {
359 let name = prost.name.clone();
360 let id = prost.id;
361 let subscription_catalog = SubscriptionCatalog::from(prost);
362 let subscription_ref = Arc::new(subscription_catalog);
363
364 self.subscription_by_name
365 .try_insert(name, subscription_ref.clone())
366 .unwrap();
367 self.subscription_by_id
368 .try_insert(id, subscription_ref)
369 .unwrap();
370 }
371
372 pub fn drop_subscription(&mut self, id: SubscriptionId) {
373 let subscription_ref = self.subscription_by_id.remove(&id);
374 if let Some(subscription_ref) = subscription_ref {
375 self.subscription_by_name.remove(&subscription_ref.name);
376 }
377 }
378
379 pub fn update_subscription(&mut self, prost: &PbSubscription) {
380 let name = prost.name.clone();
381 let id = prost.id;
382 let subscription = SubscriptionCatalog::from(prost);
383 let subscription_ref = Arc::new(subscription);
384
385 let old_subscription = self.subscription_by_id.get(&id).unwrap();
386 if old_subscription.name != name
388 && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
389 && s.id == id
390 {
391 self.subscription_by_name.remove(&old_subscription.name);
392 }
393
394 self.subscription_by_name
395 .insert(name, subscription_ref.clone());
396 self.subscription_by_id.insert(id, subscription_ref);
397 }
398
399 pub fn create_view(&mut self, prost: &PbView) {
400 let name = prost.name.clone();
401 let id = prost.id;
402 let view = ViewCatalog::from(prost);
403 let view_ref = Arc::new(view);
404
405 self.view_by_name
406 .try_insert(name, view_ref.clone())
407 .unwrap();
408 self.view_by_id.try_insert(id, view_ref).unwrap();
409 }
410
411 pub fn drop_view(&mut self, id: ViewId) {
412 let view_ref = self.view_by_id.remove(&id).unwrap();
413 self.view_by_name.remove(&view_ref.name).unwrap();
414 }
415
416 pub fn update_view(&mut self, prost: &PbView) {
417 let name = prost.name.clone();
418 let id = prost.id;
419 let view = ViewCatalog::from(prost);
420 let view_ref = Arc::new(view);
421
422 let old_view = self.view_by_id.get(&id).unwrap();
423 if old_view.name != name
425 && let Some(v) = self.view_by_name.get(old_view.name())
426 && v.id == id
427 {
428 self.view_by_name.remove(&old_view.name);
429 }
430
431 self.view_by_name.insert(name, view_ref.clone());
432 self.view_by_id.insert(id, view_ref);
433 }
434
435 pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
436 FuncSign {
437 name: FuncName::Udf(func.name.clone()),
438 inputs_type: func
439 .arg_types
440 .iter()
441 .map(|t| t.clone().into())
442 .collect_vec(),
443 variadic: false,
444 ret_type: func.return_type.clone().into(),
445 build: FuncBuilder::Udf,
446 type_infer: |_| Ok(DataType::Boolean),
448 deprecated: false,
449 }
450 }
451
452 pub fn create_function(&mut self, prost: &PbFunction) {
453 let name = prost.name.clone();
454 let id = prost.id;
455 let function = FunctionCatalog::from(prost);
456 let args = function.arg_types.clone();
457 let function_ref = Arc::new(function);
458
459 self.function_registry
460 .insert(Self::get_func_sign(&function_ref));
461 self.function_by_name
462 .entry(name)
463 .or_default()
464 .try_insert(args, function_ref.clone())
465 .expect("function already exists with same argument types");
466 self.function_by_id
467 .try_insert(id, function_ref)
468 .expect("function id exists");
469 }
470
471 pub fn drop_function(&mut self, id: FunctionId) {
472 let function_ref = self
473 .function_by_id
474 .remove(&id)
475 .expect("function not found by id");
476
477 self.function_registry
478 .remove(Self::get_func_sign(&function_ref))
479 .expect("function not found in registry");
480
481 self.function_by_name
482 .get_mut(&function_ref.name)
483 .expect("function not found by name")
484 .remove(&function_ref.arg_types)
485 .expect("function not found by argument types");
486 }
487
488 pub fn update_function(&mut self, prost: &PbFunction) {
489 let name = prost.name.clone();
490 let id = prost.id;
491 let function = FunctionCatalog::from(prost);
492 let function_ref = Arc::new(function);
493
494 let old_function_by_id = self.function_by_id.get(&id).unwrap();
495 let old_function_by_name = self
496 .function_by_name
497 .get_mut(&old_function_by_id.name)
498 .unwrap();
499 if old_function_by_id.name != name
501 && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
502 && f.id == id
503 {
504 old_function_by_name.remove(&old_function_by_id.arg_types);
505 if old_function_by_name.is_empty() {
506 self.function_by_name.remove(&old_function_by_id.name);
507 }
508 }
509
510 self.function_by_name
511 .entry(name)
512 .or_default()
513 .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
514 self.function_by_id.insert(id, function_ref);
515 }
516
517 pub fn create_connection(&mut self, prost: &PbConnection) {
518 let name = prost.name.clone();
519 let id = prost.id;
520 let connection = ConnectionCatalog::from(prost);
521 let connection_ref = Arc::new(connection);
522 self.connection_by_name
523 .try_insert(name, connection_ref.clone())
524 .unwrap();
525 self.connection_by_id
526 .try_insert(id, connection_ref)
527 .unwrap();
528 }
529
530 pub fn update_connection(&mut self, prost: &PbConnection) {
531 let name = prost.name.clone();
532 let id = prost.id;
533 let connection = ConnectionCatalog::from(prost);
534 let connection_ref = Arc::new(connection);
535
536 let old_connection = self.connection_by_id.get(&id).unwrap();
537 if old_connection.name != name
539 && let Some(conn) = self.connection_by_name.get(&old_connection.name)
540 && conn.id == id
541 {
542 self.connection_by_name.remove(&old_connection.name);
543 }
544
545 self.connection_by_name.insert(name, connection_ref.clone());
546 self.connection_by_id.insert(id, connection_ref);
547 }
548
549 pub fn drop_connection(&mut self, connection_id: ConnectionId) {
550 let connection_ref = self
551 .connection_by_id
552 .remove(&connection_id)
553 .expect("connection not found by id");
554 self.connection_by_name
555 .remove(&connection_ref.name)
556 .expect("connection not found by name");
557 }
558
559 pub fn create_secret(&mut self, prost: &PbSecret) {
560 let name = prost.name.clone();
561 let id = prost.id;
562 let secret = SecretCatalog::from(prost);
563 let secret_ref = Arc::new(secret);
564
565 self.secret_by_id
566 .try_insert(id, secret_ref.clone())
567 .unwrap();
568 self.secret_by_name.try_insert(name, secret_ref).unwrap();
569 }
570
571 pub fn update_secret(&mut self, prost: &PbSecret) {
572 let name = prost.name.clone();
573 let id = prost.id;
574 let secret = SecretCatalog::from(prost);
575 let secret_ref = Arc::new(secret);
576
577 let old_secret = self.secret_by_id.get(&id).unwrap();
578 if old_secret.name != name
580 && let Some(s) = self.secret_by_name.get(&old_secret.name)
581 && s.id == id
582 {
583 self.secret_by_name.remove(&old_secret.name);
584 }
585
586 self.secret_by_name.insert(name, secret_ref.clone());
587 self.secret_by_id.insert(id, secret_ref);
588 }
589
590 pub fn drop_secret(&mut self, secret_id: SecretId) {
591 let secret_ref = self
592 .secret_by_id
593 .remove(&secret_id)
594 .expect("secret not found by id");
595 self.secret_by_name
596 .remove(&secret_ref.name)
597 .expect("secret not found by name");
598 }
599
600 pub fn iter_object_ids(&self) -> impl Iterator<Item = ObjectId> + '_ {
601 self.table_by_id
602 .keys()
603 .map(|id| id.as_object_id())
604 .chain(self.source_by_id.keys().map(|id| id.as_object_id()))
605 .chain(self.sink_by_id.keys().map(|id| id.as_object_id()))
606 .chain(self.subscription_by_id.keys().map(|id| id.as_object_id()))
607 .chain(self.index_by_id.keys().map(|id| id.as_object_id()))
608 .chain(self.view_by_id.keys().map(|id| id.as_object_id()))
609 .chain(self.function_by_id.keys().map(|id| id.as_object_id()))
610 .chain(self.connection_by_id.keys().map(|id| id.as_object_id()))
611 .chain(self.secret_by_id.keys().map(|id| id.as_object_id()))
612 }
613
614 pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
615 self.table_by_name.values()
616 }
617
618 pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
619 self.table_by_name.values().filter(|v| v.is_user_table())
620 }
621
622 pub fn iter_user_table_with_acl<'a>(
623 &'a self,
624 user: &'a UserCatalog,
625 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
626 self.table_by_name
627 .values()
628 .filter(|v| v.is_user_table() && has_access_to_object(user, v.id, v.owner))
629 }
630
631 pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
632 self.table_by_name
633 .values()
634 .filter(|v| v.is_internal_table())
635 }
636
637 pub fn iter_internal_table_with_acl<'a>(
638 &'a self,
639 user: &'a UserCatalog,
640 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
641 self.table_by_name
642 .values()
643 .filter(|v| v.is_internal_table() && has_access_to_object(user, v.id, v.owner))
644 }
645
646 pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
648 self.table_by_name
649 .values()
650 .filter(|v| !v.is_internal_table())
651 }
652
653 pub fn iter_table_mv_indices_with_acl<'a>(
654 &'a self,
655 user: &'a UserCatalog,
656 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
657 self.table_by_name
658 .values()
659 .filter(|v| !v.is_internal_table() && has_access_to_object(user, v.id, v.owner))
660 }
661
662 pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
664 self.table_by_name.values().filter(|v| v.is_mview())
665 }
666
667 pub fn iter_all_mvs_with_acl<'a>(
668 &'a self,
669 user: &'a UserCatalog,
670 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
671 self.table_by_name
672 .values()
673 .filter(|v| v.is_mview() && has_access_to_object(user, v.id, v.owner))
674 }
675
676 pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
678 self.table_by_name
679 .values()
680 .filter(|v| v.is_mview() && v.is_created())
681 }
682
683 pub fn iter_created_mvs_with_acl<'a>(
684 &'a self,
685 user: &'a UserCatalog,
686 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
687 self.table_by_name
688 .values()
689 .filter(|v| v.is_mview() && v.is_created() && has_access_to_object(user, v.id, v.owner))
690 }
691
692 pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
694 self.index_by_name.values()
695 }
696
697 pub fn iter_index_with_acl<'a>(
698 &'a self,
699 user: &'a UserCatalog,
700 ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
701 self.index_by_name
702 .values()
703 .filter(|idx| has_access_to_object(user, idx.id, idx.owner()))
704 }
705
706 pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
708 self.source_by_name.values()
709 }
710
711 pub fn iter_source_with_acl<'a>(
712 &'a self,
713 user: &'a UserCatalog,
714 ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
715 self.source_by_name
716 .values()
717 .filter(|s| has_access_to_object(user, s.id, s.owner))
718 }
719
720 pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
721 self.sink_by_name.values()
722 }
723
724 pub fn iter_sink_with_acl<'a>(
725 &'a self,
726 user: &'a UserCatalog,
727 ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
728 self.sink_by_name
729 .values()
730 .filter(|s| has_access_to_object(user, s.id, s.owner))
731 }
732
733 pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
734 self.subscription_by_name.values()
735 }
736
737 pub fn iter_subscription_with_acl<'a>(
738 &'a self,
739 user: &'a UserCatalog,
740 ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
741 self.subscription_by_name
742 .values()
743 .filter(|s| has_access_to_object(user, s.id, s.owner))
744 }
745
746 pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
747 self.view_by_name.values()
748 }
749
750 pub fn iter_view_with_acl<'a>(
751 &'a self,
752 user: &'a UserCatalog,
753 ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
754 self.view_by_name
755 .values()
756 .filter(|v| v.is_system_view() || has_access_to_object(user, v.id, v.owner))
757 }
758
759 pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
760 self.function_by_name.values().flat_map(|v| v.values())
761 }
762
763 pub fn iter_function_with_acl<'a>(
764 &'a self,
765 user: &'a UserCatalog,
766 ) -> impl Iterator<Item = &'a Arc<FunctionCatalog>> {
767 self.function_by_name
768 .values()
769 .flat_map(|v| v.values())
770 .filter(|f| has_access_to_object(user, f.id, f.owner))
771 }
772
773 pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
774 self.connection_by_name.values()
775 }
776
777 pub fn iter_connections_with_acl<'a>(
778 &'a self,
779 user: &'a UserCatalog,
780 ) -> impl Iterator<Item = &'a Arc<ConnectionCatalog>> {
781 self.connection_by_name
782 .values()
783 .filter(|c| has_access_to_object(user, c.id, c.owner))
784 }
785
786 pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
787 self.secret_by_name.values()
788 }
789
790 pub fn iter_secret_with_acl<'a>(
791 &'a self,
792 user: &'a UserCatalog,
793 ) -> impl Iterator<Item = &'a Arc<SecretCatalog>> {
794 self.secret_by_name
795 .values()
796 .filter(|s| has_access_to_object(user, s.id, s.owner))
797 }
798
799 pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
800 self.system_table_by_name.values()
801 }
802
803 pub fn get_table_by_name(
804 &self,
805 table_name: &str,
806 bind_creating_relations: bool,
807 ) -> Option<&Arc<TableCatalog>> {
808 self.table_by_name
809 .get(table_name)
810 .filter(|&table| bind_creating_relations || table.is_created())
811 }
812
813 pub fn get_any_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
814 self.get_table_by_name(table_name, true)
815 }
816
817 pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
818 self.get_table_by_name(table_name, false)
819 }
820
821 pub fn get_table_by_id(&self, table_id: TableId) -> Option<&Arc<TableCatalog>> {
822 self.table_by_id.get(&table_id)
823 }
824
825 pub fn get_created_table_by_id(&self, table_id: TableId) -> Option<&Arc<TableCatalog>> {
826 self.table_by_id
827 .get(&table_id)
828 .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
829 }
830
831 pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
832 self.view_by_name.get(view_name)
833 }
834
835 pub fn get_view_by_id(&self, view_id: ViewId) -> Option<&Arc<ViewCatalog>> {
836 self.view_by_id.get(&view_id)
837 }
838
839 pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
840 self.source_by_name.get(source_name)
841 }
842
843 pub fn get_source_by_id(&self, source_id: SourceId) -> Option<&Arc<SourceCatalog>> {
844 self.source_by_id.get(&source_id)
845 }
846
847 pub fn get_sink_by_name(
848 &self,
849 sink_name: &str,
850 bind_creating: bool,
851 ) -> Option<&Arc<SinkCatalog>> {
852 self.sink_by_name
853 .get(sink_name)
854 .filter(|s| bind_creating || s.is_created())
855 }
856
857 pub fn get_any_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
858 self.get_sink_by_name(sink_name, true)
859 }
860
861 pub fn get_created_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
862 self.get_sink_by_name(sink_name, false)
863 }
864
865 pub fn get_sink_by_id(&self, sink_id: SinkId) -> Option<&Arc<SinkCatalog>> {
866 self.sink_by_id.get(&sink_id)
867 }
868
869 pub fn get_subscription_by_name(
870 &self,
871 subscription_name: &str,
872 ) -> Option<&Arc<SubscriptionCatalog>> {
873 self.subscription_by_name.get(subscription_name)
874 }
875
876 pub fn get_subscription_by_id(
877 &self,
878 subscription_id: SubscriptionId,
879 ) -> Option<&Arc<SubscriptionCatalog>> {
880 self.subscription_by_id.get(&subscription_id)
881 }
882
883 pub fn get_index_by_name(
884 &self,
885 index_name: &str,
886 bind_creating: bool,
887 ) -> Option<&Arc<IndexCatalog>> {
888 self.index_by_name
889 .get(index_name)
890 .filter(|i| bind_creating || i.is_created())
891 }
892
893 pub fn get_any_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
894 self.get_index_by_name(index_name, true)
895 }
896
897 pub fn get_created_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
898 self.get_index_by_name(index_name, false)
899 }
900
901 pub fn get_index_by_id(&self, index_id: IndexId) -> Option<&Arc<IndexCatalog>> {
902 self.index_by_id.get(&index_id)
903 }
904
905 pub fn get_indexes_by_table_id(
906 &self,
907 table_id: TableId,
908 include_creating: bool,
909 ) -> Vec<Arc<IndexCatalog>> {
910 self.indexes_by_table_id
911 .get(&table_id)
912 .cloned()
913 .unwrap_or_default()
914 .into_iter()
915 .filter(|i| include_creating || i.is_created())
916 .collect()
917 }
918
919 pub fn get_any_indexes_by_table_id(&self, table_id: TableId) -> Vec<Arc<IndexCatalog>> {
920 self.get_indexes_by_table_id(table_id, true)
921 }
922
923 pub fn get_created_indexes_by_table_id(&self, table_id: TableId) -> Vec<Arc<IndexCatalog>> {
924 self.get_indexes_by_table_id(table_id, false)
925 }
926
927 pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
928 self.system_table_by_name.get(table_name)
929 }
930
931 pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
932 self.table_by_id
933 .get(&table_id)
934 .map(|table| table.name.clone())
935 }
936
937 pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
938 self.function_by_id.get(&function_id)
939 }
940
941 pub fn get_function_by_name_inputs(
942 &self,
943 name: &str,
944 inputs: &mut [ExprImpl],
945 ) -> Option<&Arc<FunctionCatalog>> {
946 infer_type_with_sigmap(
947 FuncName::Udf(name.to_owned()),
948 inputs,
949 &self.function_registry,
950 )
951 .ok()?;
952 let args = inputs.iter().map(|x| x.return_type()).collect_vec();
953 self.function_by_name.get(name)?.get(&args)
954 }
955
956 pub fn get_function_by_name_args(
957 &self,
958 name: &str,
959 args: &[DataType],
960 ) -> Option<&Arc<FunctionCatalog>> {
961 let args = args.iter().map(|x| Some(x.clone())).collect_vec();
962 let func = infer_type_name(
963 &self.function_registry,
964 FuncName::Udf(name.to_owned()),
965 &args,
966 )
967 .ok()?;
968
969 let args = func
970 .inputs_type
971 .iter()
972 .filter_map(|x| {
973 if let SigDataType::Exact(t) = x {
974 Some(t.clone())
975 } else {
976 None
977 }
978 })
979 .collect_vec();
980
981 self.function_by_name.get(name)?.get(&args)
982 }
983
984 pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
985 let functions = self.function_by_name.get(name)?;
986 if functions.is_empty() {
987 return None;
988 }
989 Some(functions.values().collect())
990 }
991
992 pub fn get_connection_by_id(
993 &self,
994 connection_id: ConnectionId,
995 ) -> Option<&Arc<ConnectionCatalog>> {
996 self.connection_by_id.get(&connection_id)
997 }
998
999 pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
1000 self.connection_by_name.get(connection_name)
1001 }
1002
1003 pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
1004 self.secret_by_name.get(secret_name)
1005 }
1006
1007 pub fn get_secret_by_id(&self, secret_id: SecretId) -> Option<&Arc<SecretCatalog>> {
1008 self.secret_by_id.get(&secret_id)
1009 }
1010
1011 pub fn get_source_ids_by_connection(
1013 &self,
1014 connection_id: ConnectionId,
1015 ) -> Option<Vec<SourceId>> {
1016 self.connection_source_ref
1017 .get(&connection_id)
1018 .map(|c| c.to_owned())
1019 }
1020
1021 pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
1023 self.connection_sink_ref
1024 .get(&connection_id)
1025 .map(|s| s.to_owned())
1026 }
1027
1028 pub fn get_grant_object_by_oid(&self, oid: ObjectId) -> Option<OwnedGrantObject> {
1029 #[allow(clippy::manual_map)]
1030 if let Some(table) = self.get_created_table_by_id(oid.as_table_id()) {
1031 Some(OwnedGrantObject {
1032 owner: table.owner,
1033 object: Object::TableId(oid.as_table_id()),
1034 })
1035 } else if let Some(index) = self.get_index_by_id(oid.as_index_id()) {
1036 Some(OwnedGrantObject {
1037 owner: index.owner(),
1038 object: Object::TableId(oid.as_table_id()),
1039 })
1040 } else if let Some(source) = self.get_source_by_id(oid.as_source_id()) {
1041 Some(OwnedGrantObject {
1042 owner: source.owner,
1043 object: Object::SourceId(oid.as_source_id()),
1044 })
1045 } else if let Some(sink) = self.get_sink_by_id(oid.as_sink_id()) {
1046 Some(OwnedGrantObject {
1047 owner: sink.owner,
1048 object: Object::SinkId(oid.as_sink_id()),
1049 })
1050 } else if let Some(view) = self.get_view_by_id(oid.as_view_id()) {
1051 Some(OwnedGrantObject {
1052 owner: view.owner,
1053 object: Object::ViewId(oid.as_view_id()),
1054 })
1055 } else if let Some(function) = self.get_function_by_id(oid.as_function_id()) {
1056 Some(OwnedGrantObject {
1057 owner: function.owner(),
1058 object: Object::FunctionId(oid.as_function_id()),
1059 })
1060 } else if let Some(subscription) = self.get_subscription_by_id(oid.as_subscription_id()) {
1061 Some(OwnedGrantObject {
1062 owner: subscription.owner,
1063 object: Object::SubscriptionId(oid.as_subscription_id()),
1064 })
1065 } else if let Some(connection) = self.get_connection_by_id(oid.as_connection_id()) {
1066 Some(OwnedGrantObject {
1067 owner: connection.owner,
1068 object: Object::ConnectionId(oid.as_connection_id()),
1069 })
1070 } else if let Some(secret) = self.get_secret_by_id(oid.as_secret_id()) {
1071 Some(OwnedGrantObject {
1072 owner: secret.owner,
1073 object: Object::SecretId(oid.as_secret_id()),
1074 })
1075 } else {
1076 None
1077 }
1078 }
1079
1080 pub fn contains_object(&self, oid: ObjectId) -> bool {
1081 self.table_by_id.contains_key(&oid.as_table_id())
1082 || self.index_by_id.contains_key(&oid.as_index_id())
1083 || self.source_by_id.contains_key(&oid.as_source_id())
1084 || self.sink_by_id.contains_key(&oid.as_sink_id())
1085 || self.view_by_id.contains_key(&oid.as_view_id())
1086 || self.function_by_id.contains_key(&oid.as_function_id())
1087 || self
1088 .subscription_by_id
1089 .contains_key(&oid.as_subscription_id())
1090 || self.connection_by_id.contains_key(&oid.as_connection_id())
1091 }
1092
1093 pub fn id(&self) -> SchemaId {
1094 self.id
1095 }
1096
1097 pub fn database_id(&self) -> DatabaseId {
1098 self.database_id
1099 }
1100
1101 pub fn name(&self) -> String {
1102 self.name.clone()
1103 }
1104}
1105
1106impl OwnedByUserCatalog for SchemaCatalog {
1107 fn owner(&self) -> UserId {
1108 self.owner
1109 }
1110}
1111
1112impl From<&PbSchema> for SchemaCatalog {
1113 fn from(schema: &PbSchema) -> Self {
1114 Self {
1115 id: schema.id,
1116 owner: schema.owner,
1117 name: schema.name.clone(),
1118 database_id: schema.database_id,
1119 table_by_name: HashMap::new(),
1120 table_by_id: HashMap::new(),
1121 source_by_name: HashMap::new(),
1122 source_by_id: HashMap::new(),
1123 sink_by_name: HashMap::new(),
1124 sink_by_id: HashMap::new(),
1125 table_incoming_sinks: HashMap::new(),
1126 index_by_name: HashMap::new(),
1127 index_by_id: HashMap::new(),
1128 indexes_by_table_id: HashMap::new(),
1129 system_table_by_name: HashMap::new(),
1130 view_by_name: HashMap::new(),
1131 view_by_id: HashMap::new(),
1132 function_registry: FunctionRegistry::default(),
1133 function_by_name: HashMap::new(),
1134 function_by_id: HashMap::new(),
1135 connection_by_name: HashMap::new(),
1136 connection_by_id: HashMap::new(),
1137 secret_by_name: HashMap::new(),
1138 secret_by_id: HashMap::new(),
1139 _secret_source_ref: HashMap::new(),
1140 _secret_sink_ref: HashMap::new(),
1141 connection_source_ref: HashMap::new(),
1142 connection_sink_ref: HashMap::new(),
1143 subscription_by_name: HashMap::new(),
1144 subscription_by_id: HashMap::new(),
1145 }
1146 }
1147}