1use std::collections::hash_map::Entry::{Occupied, Vacant};
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25 PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26 PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, OwnedGrantObject, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47 id: SchemaId,
48 pub name: String,
49 pub database_id: DatabaseId,
50 table_by_name: HashMap<String, Arc<TableCatalog>>,
52 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54 source_by_name: HashMap<String, Arc<SourceCatalog>>,
55 source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56 sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57 sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58 table_incoming_sinks: HashMap<TableId, HashSet<SinkId>>,
60 subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
61 subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
62 index_by_name: HashMap<String, Arc<IndexCatalog>>,
63 index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
64 indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
65 view_by_name: HashMap<String, Arc<ViewCatalog>>,
66 view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
67 function_registry: FunctionRegistry,
68 function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
69 function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
70 connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
71 connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
72 secret_by_name: HashMap<String, Arc<SecretCatalog>>,
73 secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
74
75 _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
76 _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
77
78 connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
80 connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
82 system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
84 pub owner: u32,
85}
86
87impl SchemaCatalog {
88 pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
89 let name = prost.name.clone();
90 let id = prost.id.into();
91 let table: TableCatalog = prost.into();
92 let table_ref = Arc::new(table);
93
94 self.table_by_name
95 .try_insert(name, table_ref.clone())
96 .unwrap();
97 self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
98 table_ref
99 }
100
101 pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
102 self.system_table_by_name
103 .try_insert(sys_table.name.clone(), sys_table)
104 .unwrap();
105 }
106
107 pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
108 self.view_by_name
109 .try_insert(sys_view.name().to_owned(), sys_view.clone())
110 .unwrap();
111 self.view_by_id
112 .try_insert(sys_view.id, sys_view.clone())
113 .unwrap();
114 }
115
116 pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
117 let name = prost.name.clone();
118 let id = prost.id.into();
119 let table: TableCatalog = prost.into();
120 let table_ref = Arc::new(table);
121
122 let old_table = self.table_by_id.get(&id).unwrap();
123 if old_table.name() != name
125 && let Some(t) = self.table_by_name.get(old_table.name())
126 && t.id == id
127 {
128 self.table_by_name.remove(old_table.name());
129 }
130
131 self.table_by_name.insert(name, table_ref.clone());
132 self.table_by_id.insert(id, table_ref.clone());
133 table_ref
134 }
135
136 pub fn update_index(&mut self, prost: &PbIndex) {
137 let name = prost.name.clone();
138 let id = prost.id.into();
139 let old_index = self.index_by_id.get(&id).unwrap();
140 let index_table = self
141 .get_created_table_by_id(&prost.index_table_id.into())
142 .unwrap();
143 let primary_table = self
144 .get_created_table_by_id(&prost.primary_table_id.into())
145 .unwrap();
146 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
147 let index_ref = Arc::new(index);
148
149 if old_index.name != name
151 && let Some(idx) = self.index_by_name.get(&old_index.name)
152 && idx.id == id
153 {
154 self.index_by_name.remove(&old_index.name);
155 }
156 self.index_by_name.insert(name, index_ref.clone());
157 self.index_by_id.insert(id, index_ref.clone());
158
159 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
160 Occupied(mut entry) => {
161 let pos = entry
162 .get()
163 .iter()
164 .position(|x| x.id == index_ref.id)
165 .unwrap();
166 *entry.get_mut().get_mut(pos).unwrap() = index_ref;
167 }
168 Vacant(_entry) => {
169 unreachable!()
170 }
171 };
172 }
173
174 pub fn drop_table(&mut self, id: TableId) {
175 if let Some(table_ref) = self.table_by_id.remove(&id) {
176 self.table_by_name.remove(&table_ref.name).unwrap();
177 self.indexes_by_table_id.remove(&table_ref.id);
178 } else {
179 tracing::warn!(
180 id = ?id.table_id,
181 "table not found when dropping, frontend might not be notified yet"
182 );
183 }
184 }
185
186 pub fn create_index(&mut self, prost: &PbIndex) {
187 let name = prost.name.clone();
188 let id = prost.id.into();
189 let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap();
190 let primary_table = self
191 .get_created_table_by_id(&prost.primary_table_id.into())
192 .unwrap();
193 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
194 let index_ref = Arc::new(index);
195
196 self.index_by_name
197 .try_insert(name, index_ref.clone())
198 .unwrap();
199 self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
200 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
201 Occupied(mut entry) => {
202 entry.get_mut().push(index_ref);
203 }
204 Vacant(entry) => {
205 entry.insert(vec![index_ref]);
206 }
207 };
208 }
209
210 pub fn drop_index(&mut self, id: IndexId) {
211 let index_ref = self.index_by_id.remove(&id).unwrap();
212 self.index_by_name.remove(&index_ref.name).unwrap();
213 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
214 Occupied(mut entry) => {
215 let pos = entry
216 .get_mut()
217 .iter()
218 .position(|x| x.id == index_ref.id)
219 .unwrap();
220 entry.get_mut().remove(pos);
221 }
222 Vacant(_entry) => (),
223 };
224 }
225
226 pub fn create_source(&mut self, prost: &PbSource) {
227 let name = prost.name.clone();
228 let id = prost.id;
229 let source = SourceCatalog::from(prost);
230 let source_ref = Arc::new(source);
231
232 if let Some(connection_id) = source_ref.connection_id {
233 self.connection_source_ref
234 .entry(connection_id)
235 .and_modify(|sources| sources.push(source_ref.id))
236 .or_insert(vec![source_ref.id]);
237 }
238
239 self.source_by_name
240 .try_insert(name, source_ref.clone())
241 .unwrap();
242 self.source_by_id.try_insert(id, source_ref).unwrap();
243 }
244
245 pub fn drop_source(&mut self, id: SourceId) {
246 let source_ref = self.source_by_id.remove(&id).unwrap();
247 self.source_by_name.remove(&source_ref.name).unwrap();
248 if let Some(connection_id) = source_ref.connection_id
249 && let Occupied(mut e) = self.connection_source_ref.entry(connection_id)
250 {
251 let source_ids = e.get_mut();
252 source_ids.retain_mut(|sid| *sid != id);
253 if source_ids.is_empty() {
254 e.remove_entry();
255 }
256 }
257 }
258
259 pub fn update_source(&mut self, prost: &PbSource) {
260 let name = prost.name.clone();
261 let id = prost.id;
262 let source = SourceCatalog::from(prost);
263 let source_ref = Arc::new(source);
264
265 let old_source = self.source_by_id.get(&id).unwrap();
266 if old_source.name != name
268 && let Some(src) = self.source_by_name.get(&old_source.name)
269 && src.id == id
270 {
271 self.source_by_name.remove(&old_source.name);
272 }
273
274 self.source_by_name.insert(name, source_ref.clone());
275 self.source_by_id.insert(id, source_ref);
276 }
277
278 pub fn create_sink(&mut self, prost: &PbSink) {
279 let name = prost.name.clone();
280 let id = prost.id;
281 let sink = SinkCatalog::from(prost);
282 let sink_ref = Arc::new(sink);
283
284 if let Some(connection_id) = sink_ref.connection_id {
285 self.connection_sink_ref
286 .entry(connection_id.0)
287 .and_modify(|sinks| sinks.push(id))
288 .or_insert(vec![id]);
289 }
290
291 if let Some(target_table) = sink_ref.target_table {
292 assert!(
293 self.table_incoming_sinks
294 .entry(target_table)
295 .or_default()
296 .insert(sink_ref.id.sink_id)
297 );
298 }
299
300 self.sink_by_name
301 .try_insert(name, sink_ref.clone())
302 .unwrap();
303 self.sink_by_id.try_insert(id, sink_ref).unwrap();
304 }
305
306 pub fn drop_sink(&mut self, id: SinkId) {
307 if let Some(sink_ref) = self.sink_by_id.remove(&id) {
308 self.sink_by_name.remove(&sink_ref.name).unwrap();
309 if let Some(connection_id) = sink_ref.connection_id
310 && let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0)
311 {
312 let sink_ids = e.get_mut();
313 sink_ids.retain_mut(|sid| *sid != id);
314 if sink_ids.is_empty() {
315 e.remove_entry();
316 }
317 }
318 if let Some(target_table) = sink_ref.target_table {
319 let incoming_sinks = self
320 .table_incoming_sinks
321 .get_mut(&target_table)
322 .expect("should exists");
323 assert!(incoming_sinks.remove(&sink_ref.id.sink_id));
324 if incoming_sinks.is_empty() {
325 self.table_incoming_sinks.remove(&target_table);
326 }
327 }
328 } else {
329 tracing::warn!(
330 id,
331 "sink not found when dropping, frontend might not be notified yet"
332 );
333 }
334 }
335
336 pub fn update_sink(&mut self, prost: &PbSink) {
337 let name = prost.name.clone();
338 let id = prost.id;
339 let sink = SinkCatalog::from(prost);
340 let sink_ref = Arc::new(sink);
341
342 let old_sink = self.sink_by_id.get(&id).unwrap();
343 assert_eq!(sink_ref.target_table, old_sink.target_table);
344 if old_sink.name != name
346 && let Some(s) = self.sink_by_name.get(&old_sink.name)
347 && s.id.sink_id == id
348 {
349 self.sink_by_name.remove(&old_sink.name);
350 }
351
352 self.sink_by_name.insert(name, sink_ref.clone());
353 self.sink_by_id.insert(id, sink_ref);
354 }
355
356 pub fn table_incoming_sinks(&self, table_id: TableId) -> Option<&HashSet<SinkId>> {
357 self.table_incoming_sinks.get(&table_id)
358 }
359
360 pub fn create_subscription(&mut self, prost: &PbSubscription) {
361 let name = prost.name.clone();
362 let id = prost.id;
363 let subscription_catalog = SubscriptionCatalog::from(prost);
364 let subscription_ref = Arc::new(subscription_catalog);
365
366 self.subscription_by_name
367 .try_insert(name, subscription_ref.clone())
368 .unwrap();
369 self.subscription_by_id
370 .try_insert(id, subscription_ref)
371 .unwrap();
372 }
373
374 pub fn drop_subscription(&mut self, id: SubscriptionId) {
375 let subscription_ref = self.subscription_by_id.remove(&id);
376 if let Some(subscription_ref) = subscription_ref {
377 self.subscription_by_name.remove(&subscription_ref.name);
378 }
379 }
380
381 pub fn update_subscription(&mut self, prost: &PbSubscription) {
382 let name = prost.name.clone();
383 let id = prost.id;
384 let subscription = SubscriptionCatalog::from(prost);
385 let subscription_ref = Arc::new(subscription);
386
387 let old_subscription = self.subscription_by_id.get(&id).unwrap();
388 if old_subscription.name != name
390 && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
391 && s.id.subscription_id == id
392 {
393 self.subscription_by_name.remove(&old_subscription.name);
394 }
395
396 self.subscription_by_name
397 .insert(name, subscription_ref.clone());
398 self.subscription_by_id.insert(id, subscription_ref);
399 }
400
401 pub fn create_view(&mut self, prost: &PbView) {
402 let name = prost.name.clone();
403 let id = prost.id;
404 let view = ViewCatalog::from(prost);
405 let view_ref = Arc::new(view);
406
407 self.view_by_name
408 .try_insert(name, view_ref.clone())
409 .unwrap();
410 self.view_by_id.try_insert(id, view_ref).unwrap();
411 }
412
413 pub fn drop_view(&mut self, id: ViewId) {
414 let view_ref = self.view_by_id.remove(&id).unwrap();
415 self.view_by_name.remove(&view_ref.name).unwrap();
416 }
417
418 pub fn update_view(&mut self, prost: &PbView) {
419 let name = prost.name.clone();
420 let id = prost.id;
421 let view = ViewCatalog::from(prost);
422 let view_ref = Arc::new(view);
423
424 let old_view = self.view_by_id.get(&id).unwrap();
425 if old_view.name != name
427 && let Some(v) = self.view_by_name.get(old_view.name())
428 && v.id == id
429 {
430 self.view_by_name.remove(&old_view.name);
431 }
432
433 self.view_by_name.insert(name, view_ref.clone());
434 self.view_by_id.insert(id, view_ref);
435 }
436
437 pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
438 FuncSign {
439 name: FuncName::Udf(func.name.clone()),
440 inputs_type: func
441 .arg_types
442 .iter()
443 .map(|t| t.clone().into())
444 .collect_vec(),
445 variadic: false,
446 ret_type: func.return_type.clone().into(),
447 build: FuncBuilder::Udf,
448 type_infer: |_| Ok(DataType::Boolean),
450 deprecated: false,
451 }
452 }
453
454 pub fn create_function(&mut self, prost: &PbFunction) {
455 let name = prost.name.clone();
456 let id = prost.id;
457 let function = FunctionCatalog::from(prost);
458 let args = function.arg_types.clone();
459 let function_ref = Arc::new(function);
460
461 self.function_registry
462 .insert(Self::get_func_sign(&function_ref));
463 self.function_by_name
464 .entry(name)
465 .or_default()
466 .try_insert(args, function_ref.clone())
467 .expect("function already exists with same argument types");
468 self.function_by_id
469 .try_insert(id.into(), function_ref)
470 .expect("function id exists");
471 }
472
473 pub fn drop_function(&mut self, id: FunctionId) {
474 let function_ref = self
475 .function_by_id
476 .remove(&id)
477 .expect("function not found by id");
478
479 self.function_registry
480 .remove(Self::get_func_sign(&function_ref))
481 .expect("function not found in registry");
482
483 self.function_by_name
484 .get_mut(&function_ref.name)
485 .expect("function not found by name")
486 .remove(&function_ref.arg_types)
487 .expect("function not found by argument types");
488 }
489
490 pub fn update_function(&mut self, prost: &PbFunction) {
491 let name = prost.name.clone();
492 let id = prost.id.into();
493 let function = FunctionCatalog::from(prost);
494 let function_ref = Arc::new(function);
495
496 let old_function_by_id = self.function_by_id.get(&id).unwrap();
497 let old_function_by_name = self
498 .function_by_name
499 .get_mut(&old_function_by_id.name)
500 .unwrap();
501 if old_function_by_id.name != name
503 && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
504 && f.id == id
505 {
506 old_function_by_name.remove(&old_function_by_id.arg_types);
507 if old_function_by_name.is_empty() {
508 self.function_by_name.remove(&old_function_by_id.name);
509 }
510 }
511
512 self.function_by_name
513 .entry(name)
514 .or_default()
515 .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
516 self.function_by_id.insert(id, function_ref);
517 }
518
519 pub fn create_connection(&mut self, prost: &PbConnection) {
520 let name = prost.name.clone();
521 let id = prost.id;
522 let connection = ConnectionCatalog::from(prost);
523 let connection_ref = Arc::new(connection);
524 self.connection_by_name
525 .try_insert(name, connection_ref.clone())
526 .unwrap();
527 self.connection_by_id
528 .try_insert(id, connection_ref)
529 .unwrap();
530 }
531
532 pub fn update_connection(&mut self, prost: &PbConnection) {
533 let name = prost.name.clone();
534 let id = prost.id;
535 let connection = ConnectionCatalog::from(prost);
536 let connection_ref = Arc::new(connection);
537
538 let old_connection = self.connection_by_id.get(&id).unwrap();
539 if old_connection.name != name
541 && let Some(conn) = self.connection_by_name.get(&old_connection.name)
542 && conn.id == id
543 {
544 self.connection_by_name.remove(&old_connection.name);
545 }
546
547 self.connection_by_name.insert(name, connection_ref.clone());
548 self.connection_by_id.insert(id, connection_ref);
549 }
550
551 pub fn drop_connection(&mut self, connection_id: ConnectionId) {
552 let connection_ref = self
553 .connection_by_id
554 .remove(&connection_id)
555 .expect("connection not found by id");
556 self.connection_by_name
557 .remove(&connection_ref.name)
558 .expect("connection not found by name");
559 }
560
561 pub fn create_secret(&mut self, prost: &PbSecret) {
562 let name = prost.name.clone();
563 let id = SecretId::new(prost.id);
564 let secret = SecretCatalog::from(prost);
565 let secret_ref = Arc::new(secret);
566
567 self.secret_by_id
568 .try_insert(id, secret_ref.clone())
569 .unwrap();
570 self.secret_by_name
571 .try_insert(name, secret_ref.clone())
572 .unwrap();
573 }
574
575 pub fn update_secret(&mut self, prost: &PbSecret) {
576 let name = prost.name.clone();
577 let id = SecretId::new(prost.id);
578 let secret = SecretCatalog::from(prost);
579 let secret_ref = Arc::new(secret);
580
581 let old_secret = self.secret_by_id.get(&id).unwrap();
582 if old_secret.name != name
584 && let Some(s) = self.secret_by_name.get(&old_secret.name)
585 && s.id == id
586 {
587 self.secret_by_name.remove(&old_secret.name);
588 }
589
590 self.secret_by_name.insert(name, secret_ref.clone());
591 self.secret_by_id.insert(id, secret_ref);
592 }
593
594 pub fn drop_secret(&mut self, secret_id: SecretId) {
595 let secret_ref = self
596 .secret_by_id
597 .remove(&secret_id)
598 .expect("secret not found by id");
599 self.secret_by_name
600 .remove(&secret_ref.name)
601 .expect("secret not found by name");
602 }
603
604 pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
605 self.table_by_name.values()
606 }
607
608 pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
609 self.table_by_name.values().filter(|v| v.is_user_table())
610 }
611
612 pub fn iter_user_table_with_acl<'a>(
613 &'a self,
614 user: &'a UserCatalog,
615 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
616 self.table_by_name
617 .values()
618 .filter(|v| v.is_user_table() && has_access_to_object(user, v.id.table_id, v.owner))
619 }
620
621 pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
622 self.table_by_name
623 .values()
624 .filter(|v| v.is_internal_table())
625 }
626
627 pub fn iter_internal_table_with_acl<'a>(
628 &'a self,
629 user: &'a UserCatalog,
630 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
631 self.table_by_name
632 .values()
633 .filter(|v| v.is_internal_table() && has_access_to_object(user, v.id.table_id, v.owner))
634 }
635
636 pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
638 self.table_by_name
639 .values()
640 .filter(|v| !v.is_internal_table())
641 }
642
643 pub fn iter_table_mv_indices_with_acl<'a>(
644 &'a self,
645 user: &'a UserCatalog,
646 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
647 self.table_by_name.values().filter(|v| {
648 !v.is_internal_table() && has_access_to_object(user, v.id.table_id, v.owner)
649 })
650 }
651
652 pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
654 self.table_by_name.values().filter(|v| v.is_mview())
655 }
656
657 pub fn iter_all_mvs_with_acl<'a>(
658 &'a self,
659 user: &'a UserCatalog,
660 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
661 self.table_by_name
662 .values()
663 .filter(|v| v.is_mview() && has_access_to_object(user, v.id.table_id, v.owner))
664 }
665
666 pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
668 self.table_by_name
669 .values()
670 .filter(|v| v.is_mview() && v.is_created())
671 }
672
673 pub fn iter_created_mvs_with_acl<'a>(
674 &'a self,
675 user: &'a UserCatalog,
676 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
677 self.table_by_name.values().filter(|v| {
678 v.is_mview() && v.is_created() && has_access_to_object(user, v.id.table_id, v.owner)
679 })
680 }
681
682 pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
684 self.index_by_name.values()
685 }
686
687 pub fn iter_index_with_acl<'a>(
688 &'a self,
689 user: &'a UserCatalog,
690 ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
691 self.index_by_name
692 .values()
693 .filter(|idx| has_access_to_object(user, idx.id.index_id, idx.owner()))
694 }
695
696 pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
698 self.source_by_name.values()
699 }
700
701 pub fn iter_source_with_acl<'a>(
702 &'a self,
703 user: &'a UserCatalog,
704 ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
705 self.source_by_name
706 .values()
707 .filter(|s| has_access_to_object(user, s.id, s.owner))
708 }
709
710 pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
711 self.sink_by_name.values()
712 }
713
714 pub fn iter_sink_with_acl<'a>(
715 &'a self,
716 user: &'a UserCatalog,
717 ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
718 self.sink_by_name
719 .values()
720 .filter(|s| has_access_to_object(user, s.id.sink_id, s.owner.user_id))
721 }
722
723 pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
724 self.subscription_by_name.values()
725 }
726
727 pub fn iter_subscription_with_acl<'a>(
728 &'a self,
729 user: &'a UserCatalog,
730 ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
731 self.subscription_by_name
732 .values()
733 .filter(|s| has_access_to_object(user, s.id.subscription_id, s.owner.user_id))
734 }
735
736 pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
737 self.view_by_name.values()
738 }
739
740 pub fn iter_view_with_acl<'a>(
741 &'a self,
742 user: &'a UserCatalog,
743 ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
744 self.view_by_name
745 .values()
746 .filter(|v| v.is_system_view() || has_access_to_object(user, v.id, v.owner))
747 }
748
749 pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
750 self.function_by_name.values().flat_map(|v| v.values())
751 }
752
753 pub fn iter_function_with_acl<'a>(
754 &'a self,
755 user: &'a UserCatalog,
756 ) -> impl Iterator<Item = &'a Arc<FunctionCatalog>> {
757 self.function_by_name
758 .values()
759 .flat_map(|v| v.values())
760 .filter(|f| has_access_to_object(user, f.id.function_id(), f.owner))
761 }
762
763 pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
764 self.connection_by_name.values()
765 }
766
767 pub fn iter_connections_with_acl<'a>(
768 &'a self,
769 user: &'a UserCatalog,
770 ) -> impl Iterator<Item = &'a Arc<ConnectionCatalog>> {
771 self.connection_by_name
772 .values()
773 .filter(|c| has_access_to_object(user, c.id, c.owner))
774 }
775
776 pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
777 self.secret_by_name.values()
778 }
779
780 pub fn iter_secret_with_acl<'a>(
781 &'a self,
782 user: &'a UserCatalog,
783 ) -> impl Iterator<Item = &'a Arc<SecretCatalog>> {
784 self.secret_by_name
785 .values()
786 .filter(|s| has_access_to_object(user, s.id.secret_id(), s.owner))
787 }
788
789 pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
790 self.system_table_by_name.values()
791 }
792
793 pub fn get_table_by_name(
794 &self,
795 table_name: &str,
796 bind_creating_relations: bool,
797 ) -> Option<&Arc<TableCatalog>> {
798 self.table_by_name
799 .get(table_name)
800 .filter(|&table| bind_creating_relations || table.is_created())
801 }
802
803 pub fn get_any_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
804 self.get_table_by_name(table_name, true)
805 }
806
807 pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
808 self.get_table_by_name(table_name, false)
809 }
810
811 pub fn get_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
812 self.table_by_id.get(table_id)
813 }
814
815 pub fn get_created_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
816 self.table_by_id
817 .get(table_id)
818 .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
819 }
820
821 pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
822 self.view_by_name.get(view_name)
823 }
824
825 pub fn get_view_by_id(&self, view_id: &ViewId) -> Option<&Arc<ViewCatalog>> {
826 self.view_by_id.get(view_id)
827 }
828
829 pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
830 self.source_by_name.get(source_name)
831 }
832
833 pub fn get_source_by_id(&self, source_id: &SourceId) -> Option<&Arc<SourceCatalog>> {
834 self.source_by_id.get(source_id)
835 }
836
837 pub fn get_sink_by_name(
838 &self,
839 sink_name: &str,
840 bind_creating: bool,
841 ) -> Option<&Arc<SinkCatalog>> {
842 self.sink_by_name
843 .get(sink_name)
844 .filter(|s| bind_creating || s.is_created())
845 }
846
847 pub fn get_any_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
848 self.get_sink_by_name(sink_name, true)
849 }
850
851 pub fn get_created_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
852 self.get_sink_by_name(sink_name, false)
853 }
854
855 pub fn get_sink_by_id(&self, sink_id: &SinkId) -> Option<&Arc<SinkCatalog>> {
856 self.sink_by_id.get(sink_id)
857 }
858
859 pub fn get_subscription_by_name(
860 &self,
861 subscription_name: &str,
862 ) -> Option<&Arc<SubscriptionCatalog>> {
863 self.subscription_by_name.get(subscription_name)
864 }
865
866 pub fn get_subscription_by_id(
867 &self,
868 subscription_id: &SubscriptionId,
869 ) -> Option<&Arc<SubscriptionCatalog>> {
870 self.subscription_by_id.get(subscription_id)
871 }
872
873 pub fn get_index_by_name(
874 &self,
875 index_name: &str,
876 bind_creating: bool,
877 ) -> Option<&Arc<IndexCatalog>> {
878 self.index_by_name
879 .get(index_name)
880 .filter(|i| bind_creating || i.is_created())
881 }
882
883 pub fn get_any_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
884 self.get_index_by_name(index_name, true)
885 }
886
887 pub fn get_created_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
888 self.get_index_by_name(index_name, false)
889 }
890
891 pub fn get_index_by_id(&self, index_id: &IndexId) -> Option<&Arc<IndexCatalog>> {
892 self.index_by_id.get(index_id)
893 }
894
895 pub fn get_indexes_by_table_id(
896 &self,
897 table_id: &TableId,
898 include_creating: bool,
899 ) -> Vec<Arc<IndexCatalog>> {
900 self.indexes_by_table_id
901 .get(table_id)
902 .cloned()
903 .unwrap_or_default()
904 .into_iter()
905 .filter(|i| include_creating || i.is_created())
906 .collect()
907 }
908
909 pub fn get_any_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
910 self.get_indexes_by_table_id(table_id, true)
911 }
912
913 pub fn get_created_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
914 self.get_indexes_by_table_id(table_id, false)
915 }
916
917 pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
918 self.system_table_by_name.get(table_name)
919 }
920
921 pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
922 self.table_by_id
923 .get(&table_id)
924 .map(|table| table.name.clone())
925 }
926
927 pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
928 self.function_by_id.get(&function_id)
929 }
930
931 pub fn get_function_by_name_inputs(
932 &self,
933 name: &str,
934 inputs: &mut [ExprImpl],
935 ) -> Option<&Arc<FunctionCatalog>> {
936 infer_type_with_sigmap(
937 FuncName::Udf(name.to_owned()),
938 inputs,
939 &self.function_registry,
940 )
941 .ok()?;
942 let args = inputs.iter().map(|x| x.return_type()).collect_vec();
943 self.function_by_name.get(name)?.get(&args)
944 }
945
946 pub fn get_function_by_name_args(
947 &self,
948 name: &str,
949 args: &[DataType],
950 ) -> Option<&Arc<FunctionCatalog>> {
951 let args = args.iter().map(|x| Some(x.clone())).collect_vec();
952 let func = infer_type_name(
953 &self.function_registry,
954 FuncName::Udf(name.to_owned()),
955 &args,
956 )
957 .ok()?;
958
959 let args = func
960 .inputs_type
961 .iter()
962 .filter_map(|x| {
963 if let SigDataType::Exact(t) = x {
964 Some(t.clone())
965 } else {
966 None
967 }
968 })
969 .collect_vec();
970
971 self.function_by_name.get(name)?.get(&args)
972 }
973
974 pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
975 let functions = self.function_by_name.get(name)?;
976 if functions.is_empty() {
977 return None;
978 }
979 Some(functions.values().collect())
980 }
981
982 pub fn get_connection_by_id(
983 &self,
984 connection_id: &ConnectionId,
985 ) -> Option<&Arc<ConnectionCatalog>> {
986 self.connection_by_id.get(connection_id)
987 }
988
989 pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
990 self.connection_by_name.get(connection_name)
991 }
992
993 pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
994 self.secret_by_name.get(secret_name)
995 }
996
997 pub fn get_secret_by_id(&self, secret_id: &SecretId) -> Option<&Arc<SecretCatalog>> {
998 self.secret_by_id.get(secret_id)
999 }
1000
1001 pub fn get_source_ids_by_connection(
1003 &self,
1004 connection_id: ConnectionId,
1005 ) -> Option<Vec<SourceId>> {
1006 self.connection_source_ref
1007 .get(&connection_id)
1008 .map(|c| c.to_owned())
1009 }
1010
1011 pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
1013 self.connection_sink_ref
1014 .get(&connection_id)
1015 .map(|s| s.to_owned())
1016 }
1017
1018 pub fn get_grant_object_by_oid(&self, oid: u32) -> Option<OwnedGrantObject> {
1019 #[allow(clippy::manual_map)]
1020 if let Some(table) = self.get_created_table_by_id(&TableId::new(oid)) {
1021 Some(OwnedGrantObject {
1022 owner: table.owner,
1023 object: Object::TableId(oid),
1024 })
1025 } else if let Some(index) = self.get_index_by_id(&IndexId::new(oid)) {
1026 Some(OwnedGrantObject {
1027 owner: index.owner(),
1028 object: Object::TableId(oid),
1029 })
1030 } else if let Some(source) = self.get_source_by_id(&oid) {
1031 Some(OwnedGrantObject {
1032 owner: source.owner,
1033 object: Object::SourceId(oid),
1034 })
1035 } else if let Some(sink) = self.get_sink_by_id(&oid) {
1036 Some(OwnedGrantObject {
1037 owner: sink.owner.user_id,
1038 object: Object::SinkId(oid),
1039 })
1040 } else if let Some(view) = self.get_view_by_id(&oid) {
1041 Some(OwnedGrantObject {
1042 owner: view.owner,
1043 object: Object::ViewId(oid),
1044 })
1045 } else if let Some(function) = self.get_function_by_id(FunctionId::new(oid)) {
1046 Some(OwnedGrantObject {
1047 owner: function.owner(),
1048 object: Object::FunctionId(oid),
1049 })
1050 } else if let Some(subscription) = self.get_subscription_by_id(&oid) {
1051 Some(OwnedGrantObject {
1052 owner: subscription.owner.user_id,
1053 object: Object::SubscriptionId(oid),
1054 })
1055 } else if let Some(connection) = self.get_connection_by_id(&oid) {
1056 Some(OwnedGrantObject {
1057 owner: connection.owner,
1058 object: Object::ConnectionId(oid),
1059 })
1060 } else if let Some(secret) = self.get_secret_by_id(&SecretId::new(oid)) {
1061 Some(OwnedGrantObject {
1062 owner: secret.owner,
1063 object: Object::SecretId(oid),
1064 })
1065 } else {
1066 None
1067 }
1068 }
1069
1070 pub fn contains_object(&self, oid: u32) -> bool {
1071 self.table_by_id.contains_key(&TableId::new(oid))
1072 || self.index_by_id.contains_key(&IndexId::new(oid))
1073 || self.source_by_id.contains_key(&oid)
1074 || self.sink_by_id.contains_key(&oid)
1075 || self.view_by_id.contains_key(&oid)
1076 || self.function_by_id.contains_key(&FunctionId::new(oid))
1077 || self.subscription_by_id.contains_key(&oid)
1078 || self.connection_by_id.contains_key(&oid)
1079 }
1080
1081 pub fn id(&self) -> SchemaId {
1082 self.id
1083 }
1084
1085 pub fn database_id(&self) -> DatabaseId {
1086 self.database_id
1087 }
1088
1089 pub fn name(&self) -> String {
1090 self.name.clone()
1091 }
1092}
1093
1094impl OwnedByUserCatalog for SchemaCatalog {
1095 fn owner(&self) -> UserId {
1096 self.owner
1097 }
1098}
1099
1100impl From<&PbSchema> for SchemaCatalog {
1101 fn from(schema: &PbSchema) -> Self {
1102 Self {
1103 id: schema.id,
1104 owner: schema.owner,
1105 name: schema.name.clone(),
1106 database_id: schema.database_id,
1107 table_by_name: HashMap::new(),
1108 table_by_id: HashMap::new(),
1109 source_by_name: HashMap::new(),
1110 source_by_id: HashMap::new(),
1111 sink_by_name: HashMap::new(),
1112 sink_by_id: HashMap::new(),
1113 table_incoming_sinks: HashMap::new(),
1114 index_by_name: HashMap::new(),
1115 index_by_id: HashMap::new(),
1116 indexes_by_table_id: HashMap::new(),
1117 system_table_by_name: HashMap::new(),
1118 view_by_name: HashMap::new(),
1119 view_by_id: HashMap::new(),
1120 function_registry: FunctionRegistry::default(),
1121 function_by_name: HashMap::new(),
1122 function_by_id: HashMap::new(),
1123 connection_by_name: HashMap::new(),
1124 connection_by_id: HashMap::new(),
1125 secret_by_name: HashMap::new(),
1126 secret_by_id: HashMap::new(),
1127 _secret_source_ref: HashMap::new(),
1128 _secret_sink_ref: HashMap::new(),
1129 connection_source_ref: HashMap::new(),
1130 connection_sink_ref: HashMap::new(),
1131 subscription_by_name: HashMap::new(),
1132 subscription_by_id: HashMap::new(),
1133 }
1134 }
1135}