1use std::collections::HashMap;
16use std::collections::hash_map::Entry::{Occupied, Vacant};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25 PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26 PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, OwnedGrantObject, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47 id: SchemaId,
48 pub name: String,
49 pub database_id: DatabaseId,
50 table_by_name: HashMap<String, Arc<TableCatalog>>,
52 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54 source_by_name: HashMap<String, Arc<SourceCatalog>>,
55 source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56 sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57 sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58 subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
59 subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
60 index_by_name: HashMap<String, Arc<IndexCatalog>>,
61 index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
62 indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
63 view_by_name: HashMap<String, Arc<ViewCatalog>>,
64 view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
65 function_registry: FunctionRegistry,
66 function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
67 function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
68 connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
69 connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
70 secret_by_name: HashMap<String, Arc<SecretCatalog>>,
71 secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
72
73 _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
74 _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
75
76 connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
78 connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
80 system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
82 pub owner: u32,
83}
84
85impl SchemaCatalog {
86 pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
87 let name = prost.name.clone();
88 let id = prost.id.into();
89 let table: TableCatalog = prost.into();
90 let table_ref = Arc::new(table);
91
92 self.table_by_name
93 .try_insert(name, table_ref.clone())
94 .unwrap();
95 self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
96 table_ref
97 }
98
99 pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
100 self.system_table_by_name
101 .try_insert(sys_table.name.clone(), sys_table)
102 .unwrap();
103 }
104
105 pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
106 self.view_by_name
107 .try_insert(sys_view.name().to_owned(), sys_view.clone())
108 .unwrap();
109 self.view_by_id
110 .try_insert(sys_view.id, sys_view.clone())
111 .unwrap();
112 }
113
114 pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
115 let name = prost.name.clone();
116 let id = prost.id.into();
117 let table: TableCatalog = prost.into();
118 let table_ref = Arc::new(table);
119
120 let old_table = self.table_by_id.get(&id).unwrap();
121 if old_table.name() != name
123 && let Some(t) = self.table_by_name.get(old_table.name())
124 && t.id == id
125 {
126 self.table_by_name.remove(old_table.name());
127 }
128
129 self.table_by_name.insert(name, table_ref.clone());
130 self.table_by_id.insert(id, table_ref.clone());
131 table_ref
132 }
133
134 pub fn update_index(&mut self, prost: &PbIndex) {
135 let name = prost.name.clone();
136 let id = prost.id.into();
137 let old_index = self.index_by_id.get(&id).unwrap();
138 let index_table = self
139 .get_created_table_by_id(&prost.index_table_id.into())
140 .unwrap();
141 let primary_table = self
142 .get_created_table_by_id(&prost.primary_table_id.into())
143 .unwrap();
144 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
145 let index_ref = Arc::new(index);
146
147 if old_index.name != name
149 && let Some(idx) = self.index_by_name.get(&old_index.name)
150 && idx.id == id
151 {
152 self.index_by_name.remove(&old_index.name);
153 }
154 self.index_by_name.insert(name, index_ref.clone());
155 self.index_by_id.insert(id, index_ref.clone());
156
157 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
158 Occupied(mut entry) => {
159 let pos = entry
160 .get()
161 .iter()
162 .position(|x| x.id == index_ref.id)
163 .unwrap();
164 *entry.get_mut().get_mut(pos).unwrap() = index_ref;
165 }
166 Vacant(_entry) => {
167 unreachable!()
168 }
169 };
170 }
171
172 pub fn drop_table(&mut self, id: TableId) {
173 if let Some(table_ref) = self.table_by_id.remove(&id) {
174 self.table_by_name.remove(&table_ref.name).unwrap();
175 self.indexes_by_table_id.remove(&table_ref.id);
176 } else {
177 tracing::warn!(
178 id = ?id.table_id,
179 "table not found when dropping, frontend might not be notified yet"
180 );
181 }
182 }
183
184 pub fn create_index(&mut self, prost: &PbIndex) {
185 let name = prost.name.clone();
186 let id = prost.id.into();
187 let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap();
188 let primary_table = self
189 .get_created_table_by_id(&prost.primary_table_id.into())
190 .unwrap();
191 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
192 let index_ref = Arc::new(index);
193
194 self.index_by_name
195 .try_insert(name, index_ref.clone())
196 .unwrap();
197 self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
198 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
199 Occupied(mut entry) => {
200 entry.get_mut().push(index_ref);
201 }
202 Vacant(entry) => {
203 entry.insert(vec![index_ref]);
204 }
205 };
206 }
207
208 pub fn drop_index(&mut self, id: IndexId) {
209 let index_ref = self.index_by_id.remove(&id).unwrap();
210 self.index_by_name.remove(&index_ref.name).unwrap();
211 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
212 Occupied(mut entry) => {
213 let pos = entry
214 .get_mut()
215 .iter()
216 .position(|x| x.id == index_ref.id)
217 .unwrap();
218 entry.get_mut().remove(pos);
219 }
220 Vacant(_entry) => (),
221 };
222 }
223
224 pub fn create_source(&mut self, prost: &PbSource) {
225 let name = prost.name.clone();
226 let id = prost.id;
227 let source = SourceCatalog::from(prost);
228 let source_ref = Arc::new(source);
229
230 if let Some(connection_id) = source_ref.connection_id {
231 self.connection_source_ref
232 .entry(connection_id)
233 .and_modify(|sources| sources.push(source_ref.id))
234 .or_insert(vec![source_ref.id]);
235 }
236
237 self.source_by_name
238 .try_insert(name, source_ref.clone())
239 .unwrap();
240 self.source_by_id.try_insert(id, source_ref).unwrap();
241 }
242
243 pub fn drop_source(&mut self, id: SourceId) {
244 let source_ref = self.source_by_id.remove(&id).unwrap();
245 self.source_by_name.remove(&source_ref.name).unwrap();
246 if let Some(connection_id) = source_ref.connection_id
247 && let Occupied(mut e) = self.connection_source_ref.entry(connection_id)
248 {
249 let source_ids = e.get_mut();
250 source_ids.retain_mut(|sid| *sid != id);
251 if source_ids.is_empty() {
252 e.remove_entry();
253 }
254 }
255 }
256
257 pub fn update_source(&mut self, prost: &PbSource) {
258 let name = prost.name.clone();
259 let id = prost.id;
260 let source = SourceCatalog::from(prost);
261 let source_ref = Arc::new(source);
262
263 let old_source = self.source_by_id.get(&id).unwrap();
264 if old_source.name != name
266 && let Some(src) = self.source_by_name.get(&old_source.name)
267 && src.id == id
268 {
269 self.source_by_name.remove(&old_source.name);
270 }
271
272 self.source_by_name.insert(name, source_ref.clone());
273 self.source_by_id.insert(id, source_ref);
274 }
275
276 pub fn create_sink(&mut self, prost: &PbSink) {
277 let name = prost.name.clone();
278 let id = prost.id;
279 let sink = SinkCatalog::from(prost);
280 let sink_ref = Arc::new(sink);
281
282 if let Some(connection_id) = sink_ref.connection_id {
283 self.connection_sink_ref
284 .entry(connection_id.0)
285 .and_modify(|sinks| sinks.push(id))
286 .or_insert(vec![id]);
287 }
288
289 self.sink_by_name
290 .try_insert(name, sink_ref.clone())
291 .unwrap();
292 self.sink_by_id.try_insert(id, sink_ref).unwrap();
293 }
294
295 pub fn drop_sink(&mut self, id: SinkId) {
296 if let Some(sink_ref) = self.sink_by_id.remove(&id) {
297 self.sink_by_name.remove(&sink_ref.name).unwrap();
298 if let Some(connection_id) = sink_ref.connection_id
299 && let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0)
300 {
301 let sink_ids = e.get_mut();
302 sink_ids.retain_mut(|sid| *sid != id);
303 if sink_ids.is_empty() {
304 e.remove_entry();
305 }
306 }
307 } else {
308 tracing::warn!(
309 id,
310 "sink not found when dropping, frontend might not be notified yet"
311 );
312 }
313 }
314
315 pub fn update_sink(&mut self, prost: &PbSink) {
316 let name = prost.name.clone();
317 let id = prost.id;
318 let sink = SinkCatalog::from(prost);
319 let sink_ref = Arc::new(sink);
320
321 let old_sink = self.sink_by_id.get(&id).unwrap();
322 if old_sink.name != name
324 && let Some(s) = self.sink_by_name.get(&old_sink.name)
325 && s.id.sink_id == id
326 {
327 self.sink_by_name.remove(&old_sink.name);
328 }
329
330 self.sink_by_name.insert(name, sink_ref.clone());
331 self.sink_by_id.insert(id, sink_ref);
332 }
333
334 pub fn create_subscription(&mut self, prost: &PbSubscription) {
335 let name = prost.name.clone();
336 let id = prost.id;
337 let subscription_catalog = SubscriptionCatalog::from(prost);
338 let subscription_ref = Arc::new(subscription_catalog);
339
340 self.subscription_by_name
341 .try_insert(name, subscription_ref.clone())
342 .unwrap();
343 self.subscription_by_id
344 .try_insert(id, subscription_ref)
345 .unwrap();
346 }
347
348 pub fn drop_subscription(&mut self, id: SubscriptionId) {
349 let subscription_ref = self.subscription_by_id.remove(&id);
350 if let Some(subscription_ref) = subscription_ref {
351 self.subscription_by_name.remove(&subscription_ref.name);
352 }
353 }
354
355 pub fn update_subscription(&mut self, prost: &PbSubscription) {
356 let name = prost.name.clone();
357 let id = prost.id;
358 let subscription = SubscriptionCatalog::from(prost);
359 let subscription_ref = Arc::new(subscription);
360
361 let old_subscription = self.subscription_by_id.get(&id).unwrap();
362 if old_subscription.name != name
364 && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
365 && s.id.subscription_id == id
366 {
367 self.subscription_by_name.remove(&old_subscription.name);
368 }
369
370 self.subscription_by_name
371 .insert(name, subscription_ref.clone());
372 self.subscription_by_id.insert(id, subscription_ref);
373 }
374
375 pub fn create_view(&mut self, prost: &PbView) {
376 let name = prost.name.clone();
377 let id = prost.id;
378 let view = ViewCatalog::from(prost);
379 let view_ref = Arc::new(view);
380
381 self.view_by_name
382 .try_insert(name, view_ref.clone())
383 .unwrap();
384 self.view_by_id.try_insert(id, view_ref).unwrap();
385 }
386
387 pub fn drop_view(&mut self, id: ViewId) {
388 let view_ref = self.view_by_id.remove(&id).unwrap();
389 self.view_by_name.remove(&view_ref.name).unwrap();
390 }
391
392 pub fn update_view(&mut self, prost: &PbView) {
393 let name = prost.name.clone();
394 let id = prost.id;
395 let view = ViewCatalog::from(prost);
396 let view_ref = Arc::new(view);
397
398 let old_view = self.view_by_id.get(&id).unwrap();
399 if old_view.name != name
401 && let Some(v) = self.view_by_name.get(old_view.name())
402 && v.id == id
403 {
404 self.view_by_name.remove(&old_view.name);
405 }
406
407 self.view_by_name.insert(name, view_ref.clone());
408 self.view_by_id.insert(id, view_ref);
409 }
410
411 pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
412 FuncSign {
413 name: FuncName::Udf(func.name.clone()),
414 inputs_type: func
415 .arg_types
416 .iter()
417 .map(|t| t.clone().into())
418 .collect_vec(),
419 variadic: false,
420 ret_type: func.return_type.clone().into(),
421 build: FuncBuilder::Udf,
422 type_infer: |_| Ok(DataType::Boolean),
424 deprecated: false,
425 }
426 }
427
428 pub fn create_function(&mut self, prost: &PbFunction) {
429 let name = prost.name.clone();
430 let id = prost.id;
431 let function = FunctionCatalog::from(prost);
432 let args = function.arg_types.clone();
433 let function_ref = Arc::new(function);
434
435 self.function_registry
436 .insert(Self::get_func_sign(&function_ref));
437 self.function_by_name
438 .entry(name)
439 .or_default()
440 .try_insert(args, function_ref.clone())
441 .expect("function already exists with same argument types");
442 self.function_by_id
443 .try_insert(id.into(), function_ref)
444 .expect("function id exists");
445 }
446
447 pub fn drop_function(&mut self, id: FunctionId) {
448 let function_ref = self
449 .function_by_id
450 .remove(&id)
451 .expect("function not found by id");
452
453 self.function_registry
454 .remove(Self::get_func_sign(&function_ref))
455 .expect("function not found in registry");
456
457 self.function_by_name
458 .get_mut(&function_ref.name)
459 .expect("function not found by name")
460 .remove(&function_ref.arg_types)
461 .expect("function not found by argument types");
462 }
463
464 pub fn update_function(&mut self, prost: &PbFunction) {
465 let name = prost.name.clone();
466 let id = prost.id.into();
467 let function = FunctionCatalog::from(prost);
468 let function_ref = Arc::new(function);
469
470 let old_function_by_id = self.function_by_id.get(&id).unwrap();
471 let old_function_by_name = self
472 .function_by_name
473 .get_mut(&old_function_by_id.name)
474 .unwrap();
475 if old_function_by_id.name != name
477 && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
478 && f.id == id
479 {
480 old_function_by_name.remove(&old_function_by_id.arg_types);
481 if old_function_by_name.is_empty() {
482 self.function_by_name.remove(&old_function_by_id.name);
483 }
484 }
485
486 self.function_by_name
487 .entry(name)
488 .or_default()
489 .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
490 self.function_by_id.insert(id, function_ref);
491 }
492
493 pub fn create_connection(&mut self, prost: &PbConnection) {
494 let name = prost.name.clone();
495 let id = prost.id;
496 let connection = ConnectionCatalog::from(prost);
497 let connection_ref = Arc::new(connection);
498 self.connection_by_name
499 .try_insert(name, connection_ref.clone())
500 .unwrap();
501 self.connection_by_id
502 .try_insert(id, connection_ref)
503 .unwrap();
504 }
505
506 pub fn update_connection(&mut self, prost: &PbConnection) {
507 let name = prost.name.clone();
508 let id = prost.id;
509 let connection = ConnectionCatalog::from(prost);
510 let connection_ref = Arc::new(connection);
511
512 let old_connection = self.connection_by_id.get(&id).unwrap();
513 if old_connection.name != name
515 && let Some(conn) = self.connection_by_name.get(&old_connection.name)
516 && conn.id == id
517 {
518 self.connection_by_name.remove(&old_connection.name);
519 }
520
521 self.connection_by_name.insert(name, connection_ref.clone());
522 self.connection_by_id.insert(id, connection_ref);
523 }
524
525 pub fn drop_connection(&mut self, connection_id: ConnectionId) {
526 let connection_ref = self
527 .connection_by_id
528 .remove(&connection_id)
529 .expect("connection not found by id");
530 self.connection_by_name
531 .remove(&connection_ref.name)
532 .expect("connection not found by name");
533 }
534
535 pub fn create_secret(&mut self, prost: &PbSecret) {
536 let name = prost.name.clone();
537 let id = SecretId::new(prost.id);
538 let secret = SecretCatalog::from(prost);
539 let secret_ref = Arc::new(secret);
540
541 self.secret_by_id
542 .try_insert(id, secret_ref.clone())
543 .unwrap();
544 self.secret_by_name
545 .try_insert(name, secret_ref.clone())
546 .unwrap();
547 }
548
549 pub fn update_secret(&mut self, prost: &PbSecret) {
550 let name = prost.name.clone();
551 let id = SecretId::new(prost.id);
552 let secret = SecretCatalog::from(prost);
553 let secret_ref = Arc::new(secret);
554
555 let old_secret = self.secret_by_id.get(&id).unwrap();
556 if old_secret.name != name
558 && let Some(s) = self.secret_by_name.get(&old_secret.name)
559 && s.id == id
560 {
561 self.secret_by_name.remove(&old_secret.name);
562 }
563
564 self.secret_by_name.insert(name, secret_ref.clone());
565 self.secret_by_id.insert(id, secret_ref);
566 }
567
568 pub fn drop_secret(&mut self, secret_id: SecretId) {
569 let secret_ref = self
570 .secret_by_id
571 .remove(&secret_id)
572 .expect("secret not found by id");
573 self.secret_by_name
574 .remove(&secret_ref.name)
575 .expect("secret not found by name");
576 }
577
578 pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
579 self.table_by_name.values()
580 }
581
582 pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
583 self.table_by_name.values().filter(|v| v.is_user_table())
584 }
585
586 pub fn iter_user_table_with_acl<'a>(
587 &'a self,
588 user: &'a UserCatalog,
589 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
590 self.table_by_name.values().filter(|v| {
591 v.is_user_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
592 })
593 }
594
595 pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
596 self.table_by_name
597 .values()
598 .filter(|v| v.is_internal_table())
599 }
600
601 pub fn iter_internal_table_with_acl<'a>(
602 &'a self,
603 user: &'a UserCatalog,
604 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
605 self.table_by_name.values().filter(|v| {
606 v.is_internal_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
607 })
608 }
609
610 pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
612 self.table_by_name
613 .values()
614 .filter(|v| !v.is_internal_table())
615 }
616
617 pub fn iter_table_mv_indices_with_acl<'a>(
618 &'a self,
619 user: &'a UserCatalog,
620 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
621 self.table_by_name.values().filter(|v| {
622 !v.is_internal_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
623 })
624 }
625
626 pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
628 self.table_by_name.values().filter(|v| v.is_mview())
629 }
630
631 pub fn iter_all_mvs_with_acl<'a>(
632 &'a self,
633 user: &'a UserCatalog,
634 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
635 self.table_by_name.values().filter(|v| {
636 v.is_mview() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
637 })
638 }
639
640 pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
642 self.table_by_name
643 .values()
644 .filter(|v| v.is_mview() && v.is_created())
645 }
646
647 pub fn iter_created_mvs_with_acl<'a>(
648 &'a self,
649 user: &'a UserCatalog,
650 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
651 self.table_by_name.values().filter(|v| {
652 v.is_mview()
653 && v.is_created()
654 && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
655 })
656 }
657
658 pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
660 self.index_by_name.values()
661 }
662
663 pub fn iter_index_with_acl<'a>(
664 &'a self,
665 user: &'a UserCatalog,
666 ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
667 self.index_by_name
668 .values()
669 .filter(|idx| has_access_to_object(user, &self.name, idx.id.index_id, idx.owner()))
670 }
671
672 pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
674 self.source_by_name.values()
675 }
676
677 pub fn iter_source_with_acl<'a>(
678 &'a self,
679 user: &'a UserCatalog,
680 ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
681 self.source_by_name
682 .values()
683 .filter(|s| has_access_to_object(user, &self.name, s.id, s.owner))
684 }
685
686 pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
687 self.sink_by_name.values()
688 }
689
690 pub fn iter_sink_with_acl<'a>(
691 &'a self,
692 user: &'a UserCatalog,
693 ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
694 self.sink_by_name
695 .values()
696 .filter(|s| has_access_to_object(user, &self.name, s.id.sink_id, s.owner.user_id))
697 }
698
699 pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
700 self.subscription_by_name.values()
701 }
702
703 pub fn iter_subscription_with_acl<'a>(
704 &'a self,
705 user: &'a UserCatalog,
706 ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
707 self.subscription_by_name.values().filter(|s| {
708 has_access_to_object(user, &self.name, s.id.subscription_id, s.owner.user_id)
709 })
710 }
711
712 pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
713 self.view_by_name.values()
714 }
715
716 pub fn iter_view_with_acl<'a>(
717 &'a self,
718 user: &'a UserCatalog,
719 ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
720 self.view_by_name
721 .values()
722 .filter(|v| v.is_system_view() || has_access_to_object(user, &self.name, v.id, v.owner))
723 }
724
725 pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
726 self.function_by_name.values().flat_map(|v| v.values())
727 }
728
729 pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
730 self.connection_by_name.values()
731 }
732
733 pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
734 self.secret_by_name.values()
735 }
736
737 pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
738 self.system_table_by_name.values()
739 }
740
741 pub fn get_table_by_name(
742 &self,
743 table_name: &str,
744 bind_creating_relations: bool,
745 ) -> Option<&Arc<TableCatalog>> {
746 self.table_by_name
747 .get(table_name)
748 .filter(|&table| bind_creating_relations || table.is_created())
749 }
750
751 pub fn get_any_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
752 self.get_table_by_name(table_name, true)
753 }
754
755 pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
756 self.get_table_by_name(table_name, false)
757 }
758
759 pub fn get_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
760 self.table_by_id.get(table_id)
761 }
762
763 pub fn get_created_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
764 self.table_by_id
765 .get(table_id)
766 .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
767 }
768
769 pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
770 self.view_by_name.get(view_name)
771 }
772
773 pub fn get_view_by_id(&self, view_id: &ViewId) -> Option<&Arc<ViewCatalog>> {
774 self.view_by_id.get(view_id)
775 }
776
777 pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
778 self.source_by_name.get(source_name)
779 }
780
781 pub fn get_source_by_id(&self, source_id: &SourceId) -> Option<&Arc<SourceCatalog>> {
782 self.source_by_id.get(source_id)
783 }
784
785 pub fn get_sink_by_name(
786 &self,
787 sink_name: &str,
788 bind_creating: bool,
789 ) -> Option<&Arc<SinkCatalog>> {
790 self.sink_by_name
791 .get(sink_name)
792 .filter(|s| bind_creating || s.is_created())
793 }
794
795 pub fn get_any_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
796 self.get_sink_by_name(sink_name, true)
797 }
798
799 pub fn get_created_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
800 self.get_sink_by_name(sink_name, false)
801 }
802
803 pub fn get_sink_by_id(&self, sink_id: &SinkId) -> Option<&Arc<SinkCatalog>> {
804 self.sink_by_id.get(sink_id)
805 }
806
807 pub fn get_subscription_by_name(
808 &self,
809 subscription_name: &str,
810 ) -> Option<&Arc<SubscriptionCatalog>> {
811 self.subscription_by_name.get(subscription_name)
812 }
813
814 pub fn get_subscription_by_id(
815 &self,
816 subscription_id: &SubscriptionId,
817 ) -> Option<&Arc<SubscriptionCatalog>> {
818 self.subscription_by_id.get(subscription_id)
819 }
820
821 pub fn get_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
822 self.index_by_name.get(index_name)
823 }
824
825 pub fn get_index_by_id(&self, index_id: &IndexId) -> Option<&Arc<IndexCatalog>> {
826 self.index_by_id.get(index_id)
827 }
828
829 pub fn get_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
831 self.indexes_by_table_id
832 .get(table_id)
833 .cloned()
834 .unwrap_or_default()
835 }
836
837 pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
838 self.system_table_by_name.get(table_name)
839 }
840
841 pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
842 self.table_by_id
843 .get(&table_id)
844 .map(|table| table.name.clone())
845 }
846
847 pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
848 self.function_by_id.get(&function_id)
849 }
850
851 pub fn get_function_by_name_inputs(
852 &self,
853 name: &str,
854 inputs: &mut [ExprImpl],
855 ) -> Option<&Arc<FunctionCatalog>> {
856 infer_type_with_sigmap(
857 FuncName::Udf(name.to_owned()),
858 inputs,
859 &self.function_registry,
860 )
861 .ok()?;
862 let args = inputs.iter().map(|x| x.return_type()).collect_vec();
863 self.function_by_name.get(name)?.get(&args)
864 }
865
866 pub fn get_function_by_name_args(
867 &self,
868 name: &str,
869 args: &[DataType],
870 ) -> Option<&Arc<FunctionCatalog>> {
871 let args = args.iter().map(|x| Some(x.clone())).collect_vec();
872 let func = infer_type_name(
873 &self.function_registry,
874 FuncName::Udf(name.to_owned()),
875 &args,
876 )
877 .ok()?;
878
879 let args = func
880 .inputs_type
881 .iter()
882 .filter_map(|x| {
883 if let SigDataType::Exact(t) = x {
884 Some(t.clone())
885 } else {
886 None
887 }
888 })
889 .collect_vec();
890
891 self.function_by_name.get(name)?.get(&args)
892 }
893
894 pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
895 let functions = self.function_by_name.get(name)?;
896 if functions.is_empty() {
897 return None;
898 }
899 Some(functions.values().collect())
900 }
901
902 pub fn get_connection_by_id(
903 &self,
904 connection_id: &ConnectionId,
905 ) -> Option<&Arc<ConnectionCatalog>> {
906 self.connection_by_id.get(connection_id)
907 }
908
909 pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
910 self.connection_by_name.get(connection_name)
911 }
912
913 pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
914 self.secret_by_name.get(secret_name)
915 }
916
917 pub fn get_secret_by_id(&self, secret_id: &SecretId) -> Option<&Arc<SecretCatalog>> {
918 self.secret_by_id.get(secret_id)
919 }
920
921 pub fn get_source_ids_by_connection(
923 &self,
924 connection_id: ConnectionId,
925 ) -> Option<Vec<SourceId>> {
926 self.connection_source_ref
927 .get(&connection_id)
928 .map(|c| c.to_owned())
929 }
930
931 pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
933 self.connection_sink_ref
934 .get(&connection_id)
935 .map(|s| s.to_owned())
936 }
937
938 pub fn get_grant_object_by_oid(&self, oid: u32) -> Option<OwnedGrantObject> {
939 #[allow(clippy::manual_map)]
940 if let Some(table) = self.get_created_table_by_id(&TableId::new(oid)) {
941 Some(OwnedGrantObject {
942 owner: table.owner,
943 object: Object::TableId(oid),
944 })
945 } else if let Some(index) = self.get_index_by_id(&IndexId::new(oid)) {
946 Some(OwnedGrantObject {
947 owner: index.owner(),
948 object: Object::TableId(oid),
949 })
950 } else if let Some(source) = self.get_source_by_id(&oid) {
951 Some(OwnedGrantObject {
952 owner: source.owner,
953 object: Object::SourceId(oid),
954 })
955 } else if let Some(sink) = self.get_sink_by_id(&oid) {
956 Some(OwnedGrantObject {
957 owner: sink.owner.user_id,
958 object: Object::SinkId(oid),
959 })
960 } else if let Some(view) = self.get_view_by_id(&oid) {
961 Some(OwnedGrantObject {
962 owner: view.owner,
963 object: Object::ViewId(oid),
964 })
965 } else if let Some(function) = self.get_function_by_id(FunctionId::new(oid)) {
966 Some(OwnedGrantObject {
967 owner: function.owner(),
968 object: Object::FunctionId(oid),
969 })
970 } else if let Some(subscription) = self.get_subscription_by_id(&oid) {
971 Some(OwnedGrantObject {
972 owner: subscription.owner.user_id,
973 object: Object::SubscriptionId(oid),
974 })
975 } else if let Some(connection) = self.get_connection_by_id(&oid) {
976 Some(OwnedGrantObject {
977 owner: connection.owner,
978 object: Object::ConnectionId(oid),
979 })
980 } else if let Some(secret) = self.get_secret_by_id(&SecretId::new(oid)) {
981 Some(OwnedGrantObject {
982 owner: secret.owner,
983 object: Object::SecretId(oid),
984 })
985 } else {
986 None
987 }
988 }
989
990 pub fn contains_object(&self, oid: u32) -> bool {
991 self.table_by_id.contains_key(&TableId::new(oid))
992 || self.index_by_id.contains_key(&IndexId::new(oid))
993 || self.source_by_id.contains_key(&oid)
994 || self.sink_by_id.contains_key(&oid)
995 || self.view_by_id.contains_key(&oid)
996 || self.function_by_id.contains_key(&FunctionId::new(oid))
997 || self.subscription_by_id.contains_key(&oid)
998 || self.connection_by_id.contains_key(&oid)
999 }
1000
1001 pub fn id(&self) -> SchemaId {
1002 self.id
1003 }
1004
1005 pub fn database_id(&self) -> DatabaseId {
1006 self.database_id
1007 }
1008
1009 pub fn name(&self) -> String {
1010 self.name.clone()
1011 }
1012}
1013
1014impl OwnedByUserCatalog for SchemaCatalog {
1015 fn owner(&self) -> UserId {
1016 self.owner
1017 }
1018}
1019
1020impl From<&PbSchema> for SchemaCatalog {
1021 fn from(schema: &PbSchema) -> Self {
1022 Self {
1023 id: schema.id,
1024 owner: schema.owner,
1025 name: schema.name.clone(),
1026 database_id: schema.database_id,
1027 table_by_name: HashMap::new(),
1028 table_by_id: HashMap::new(),
1029 source_by_name: HashMap::new(),
1030 source_by_id: HashMap::new(),
1031 sink_by_name: HashMap::new(),
1032 sink_by_id: HashMap::new(),
1033 index_by_name: HashMap::new(),
1034 index_by_id: HashMap::new(),
1035 indexes_by_table_id: HashMap::new(),
1036 system_table_by_name: HashMap::new(),
1037 view_by_name: HashMap::new(),
1038 view_by_id: HashMap::new(),
1039 function_registry: FunctionRegistry::default(),
1040 function_by_name: HashMap::new(),
1041 function_by_id: HashMap::new(),
1042 connection_by_name: HashMap::new(),
1043 connection_by_id: HashMap::new(),
1044 secret_by_name: HashMap::new(),
1045 secret_by_id: HashMap::new(),
1046 _secret_source_ref: HashMap::new(),
1047 _secret_sink_ref: HashMap::new(),
1048 connection_source_ref: HashMap::new(),
1049 connection_sink_ref: HashMap::new(),
1050 subscription_by_name: HashMap::new(),
1051 subscription_by_id: HashMap::new(),
1052 }
1053 }
1054}