1use std::collections::hash_map::Entry::{Occupied, Vacant};
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
21use risingwave_common::types::DataType;
22use risingwave_connector::sink::catalog::SinkCatalog;
23pub use risingwave_expr::sig::*;
24use risingwave_pb::catalog::{
25 PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription,
26 PbTable, PbView,
27};
28use risingwave_pb::user::grant_privilege::Object;
29
30use super::subscription_catalog::SubscriptionCatalog;
31use super::{OwnedByUserCatalog, OwnedGrantObject, SubscriptionId};
32use crate::catalog::connection_catalog::ConnectionCatalog;
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::catalog::index_catalog::IndexCatalog;
35use crate::catalog::secret_catalog::SecretCatalog;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::system_catalog::SystemTableCatalog;
38use crate::catalog::table_catalog::TableCatalog;
39use crate::catalog::view_catalog::ViewCatalog;
40use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
41use crate::expr::{Expr, ExprImpl, infer_type_name, infer_type_with_sigmap};
42use crate::user::user_catalog::UserCatalog;
43use crate::user::{UserId, has_access_to_object};
44
45#[derive(Clone, Debug)]
46pub struct SchemaCatalog {
47 id: SchemaId,
48 pub name: String,
49 pub database_id: DatabaseId,
50 table_by_name: HashMap<String, Arc<TableCatalog>>,
52 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
54 source_by_name: HashMap<String, Arc<SourceCatalog>>,
55 source_by_id: HashMap<SourceId, Arc<SourceCatalog>>,
56 sink_by_name: HashMap<String, Arc<SinkCatalog>>,
57 sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
58 table_incoming_sinks: HashMap<TableId, HashSet<SinkId>>,
60 subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
61 subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
62 index_by_name: HashMap<String, Arc<IndexCatalog>>,
63 index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
64 indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
65 view_by_name: HashMap<String, Arc<ViewCatalog>>,
66 view_by_id: HashMap<ViewId, Arc<ViewCatalog>>,
67 function_registry: FunctionRegistry,
68 function_by_name: HashMap<String, HashMap<Vec<DataType>, Arc<FunctionCatalog>>>,
69 function_by_id: HashMap<FunctionId, Arc<FunctionCatalog>>,
70 connection_by_name: HashMap<String, Arc<ConnectionCatalog>>,
71 connection_by_id: HashMap<ConnectionId, Arc<ConnectionCatalog>>,
72 secret_by_name: HashMap<String, Arc<SecretCatalog>>,
73 secret_by_id: HashMap<SecretId, Arc<SecretCatalog>>,
74
75 _secret_source_ref: HashMap<SecretId, Vec<SourceId>>,
76 _secret_sink_ref: HashMap<SecretId, Vec<SinkId>>,
77
78 connection_source_ref: HashMap<ConnectionId, Vec<SourceId>>,
80 connection_sink_ref: HashMap<ConnectionId, Vec<SinkId>>,
82 system_table_by_name: HashMap<String, Arc<SystemTableCatalog>>,
84 pub owner: u32,
85}
86
87impl SchemaCatalog {
88 pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
89 let name = prost.name.clone();
90 let id = prost.id.into();
91 let table: TableCatalog = prost.into();
92 let table_ref = Arc::new(table);
93
94 self.table_by_name
95 .try_insert(name, table_ref.clone())
96 .unwrap();
97 self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
98 table_ref
99 }
100
101 pub fn create_sys_table(&mut self, sys_table: Arc<SystemTableCatalog>) {
102 self.system_table_by_name
103 .try_insert(sys_table.name.clone(), sys_table)
104 .unwrap();
105 }
106
107 pub fn create_sys_view(&mut self, sys_view: Arc<ViewCatalog>) {
108 self.view_by_name
109 .try_insert(sys_view.name().to_owned(), sys_view.clone())
110 .unwrap();
111 self.view_by_id
112 .try_insert(sys_view.id, sys_view.clone())
113 .unwrap();
114 }
115
116 pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
117 let name = prost.name.clone();
118 let id = prost.id.into();
119 let table: TableCatalog = prost.into();
120 let table_ref = Arc::new(table);
121
122 let old_table = self.table_by_id.get(&id).unwrap();
123 if old_table.name() != name
125 && let Some(t) = self.table_by_name.get(old_table.name())
126 && t.id == id
127 {
128 self.table_by_name.remove(old_table.name());
129 }
130
131 self.table_by_name.insert(name, table_ref.clone());
132 self.table_by_id.insert(id, table_ref.clone());
133 table_ref
134 }
135
136 pub fn update_index(&mut self, prost: &PbIndex) {
137 let name = prost.name.clone();
138 let id = prost.id.into();
139 let old_index = self.index_by_id.get(&id).unwrap();
140 let index_table = self
141 .get_created_table_by_id(&prost.index_table_id.into())
142 .unwrap();
143 let primary_table = self
144 .get_created_table_by_id(&prost.primary_table_id.into())
145 .unwrap();
146 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
147 let index_ref = Arc::new(index);
148
149 if old_index.name != name
151 && let Some(idx) = self.index_by_name.get(&old_index.name)
152 && idx.id == id
153 {
154 self.index_by_name.remove(&old_index.name);
155 }
156 self.index_by_name.insert(name, index_ref.clone());
157 self.index_by_id.insert(id, index_ref.clone());
158
159 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
160 Occupied(mut entry) => {
161 let pos = entry
162 .get()
163 .iter()
164 .position(|x| x.id == index_ref.id)
165 .unwrap();
166 *entry.get_mut().get_mut(pos).unwrap() = index_ref;
167 }
168 Vacant(_entry) => {
169 unreachable!()
170 }
171 };
172 }
173
174 pub fn drop_table(&mut self, id: TableId) {
175 if let Some(table_ref) = self.table_by_id.remove(&id) {
176 self.table_by_name.remove(&table_ref.name).unwrap();
177 self.indexes_by_table_id.remove(&table_ref.id);
178 } else {
179 tracing::warn!(
180 id = ?id.table_id,
181 "table not found when dropping, frontend might not be notified yet"
182 );
183 }
184 }
185
186 pub fn create_index(&mut self, prost: &PbIndex) {
187 let name = prost.name.clone();
188 let id = prost.id.into();
189 let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap();
190 let primary_table = self
191 .get_created_table_by_id(&prost.primary_table_id.into())
192 .unwrap();
193 let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
194 let index_ref = Arc::new(index);
195
196 self.index_by_name
197 .try_insert(name, index_ref.clone())
198 .unwrap();
199 self.index_by_id.try_insert(id, index_ref.clone()).unwrap();
200 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
201 Occupied(mut entry) => {
202 entry.get_mut().push(index_ref);
203 }
204 Vacant(entry) => {
205 entry.insert(vec![index_ref]);
206 }
207 };
208 }
209
210 pub fn drop_index(&mut self, id: IndexId) {
211 let index_ref = self.index_by_id.remove(&id).unwrap();
212 self.index_by_name.remove(&index_ref.name).unwrap();
213 match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
214 Occupied(mut entry) => {
215 let pos = entry
216 .get_mut()
217 .iter()
218 .position(|x| x.id == index_ref.id)
219 .unwrap();
220 entry.get_mut().remove(pos);
221 }
222 Vacant(_entry) => (),
223 };
224 }
225
226 pub fn create_source(&mut self, prost: &PbSource) {
227 let name = prost.name.clone();
228 let id = prost.id;
229 let source = SourceCatalog::from(prost);
230 let source_ref = Arc::new(source);
231
232 if let Some(connection_id) = source_ref.connection_id {
233 self.connection_source_ref
234 .entry(connection_id)
235 .and_modify(|sources| sources.push(source_ref.id))
236 .or_insert(vec![source_ref.id]);
237 }
238
239 self.source_by_name
240 .try_insert(name, source_ref.clone())
241 .unwrap();
242 self.source_by_id.try_insert(id, source_ref).unwrap();
243 }
244
245 pub fn drop_source(&mut self, id: SourceId) {
246 let source_ref = self.source_by_id.remove(&id).unwrap();
247 self.source_by_name.remove(&source_ref.name).unwrap();
248 if let Some(connection_id) = source_ref.connection_id
249 && let Occupied(mut e) = self.connection_source_ref.entry(connection_id)
250 {
251 let source_ids = e.get_mut();
252 source_ids.retain_mut(|sid| *sid != id);
253 if source_ids.is_empty() {
254 e.remove_entry();
255 }
256 }
257 }
258
259 pub fn update_source(&mut self, prost: &PbSource) {
260 let name = prost.name.clone();
261 let id = prost.id;
262 let source = SourceCatalog::from(prost);
263 let source_ref = Arc::new(source);
264
265 let old_source = self.source_by_id.get(&id).unwrap();
266 if old_source.name != name
268 && let Some(src) = self.source_by_name.get(&old_source.name)
269 && src.id == id
270 {
271 self.source_by_name.remove(&old_source.name);
272 }
273
274 self.source_by_name.insert(name, source_ref.clone());
275 self.source_by_id.insert(id, source_ref);
276 }
277
278 pub fn create_sink(&mut self, prost: &PbSink) {
279 let name = prost.name.clone();
280 let id = prost.id;
281 let sink = SinkCatalog::from(prost);
282 let sink_ref = Arc::new(sink);
283
284 if let Some(connection_id) = sink_ref.connection_id {
285 self.connection_sink_ref
286 .entry(connection_id.0)
287 .and_modify(|sinks| sinks.push(id))
288 .or_insert(vec![id]);
289 }
290
291 if let Some(target_table) = sink_ref.target_table {
292 assert!(
293 self.table_incoming_sinks
294 .entry(target_table)
295 .or_default()
296 .insert(sink_ref.id.sink_id)
297 );
298 }
299
300 self.sink_by_name
301 .try_insert(name, sink_ref.clone())
302 .unwrap();
303 self.sink_by_id.try_insert(id, sink_ref).unwrap();
304 }
305
306 pub fn drop_sink(&mut self, id: SinkId) {
307 if let Some(sink_ref) = self.sink_by_id.remove(&id) {
308 self.sink_by_name.remove(&sink_ref.name).unwrap();
309 if let Some(connection_id) = sink_ref.connection_id
310 && let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0)
311 {
312 let sink_ids = e.get_mut();
313 sink_ids.retain_mut(|sid| *sid != id);
314 if sink_ids.is_empty() {
315 e.remove_entry();
316 }
317 }
318 if let Some(target_table) = sink_ref.target_table {
319 let incoming_sinks = self
320 .table_incoming_sinks
321 .get_mut(&target_table)
322 .expect("should exists");
323 assert!(incoming_sinks.remove(&sink_ref.id.sink_id));
324 if incoming_sinks.is_empty() {
325 self.table_incoming_sinks.remove(&target_table);
326 }
327 }
328 } else {
329 tracing::warn!(
330 id,
331 "sink not found when dropping, frontend might not be notified yet"
332 );
333 }
334 }
335
336 pub fn update_sink(&mut self, prost: &PbSink) {
337 let name = prost.name.clone();
338 let id = prost.id;
339 let sink = SinkCatalog::from(prost);
340 let sink_ref = Arc::new(sink);
341
342 let old_sink = self.sink_by_id.get(&id).unwrap();
343 assert_eq!(sink_ref.target_table, old_sink.target_table);
344 if old_sink.name != name
346 && let Some(s) = self.sink_by_name.get(&old_sink.name)
347 && s.id.sink_id == id
348 {
349 self.sink_by_name.remove(&old_sink.name);
350 }
351
352 self.sink_by_name.insert(name, sink_ref.clone());
353 self.sink_by_id.insert(id, sink_ref);
354 }
355
356 pub fn table_incoming_sinks(&self, table_id: TableId) -> Option<&HashSet<SinkId>> {
357 self.table_incoming_sinks.get(&table_id)
358 }
359
360 pub fn create_subscription(&mut self, prost: &PbSubscription) {
361 let name = prost.name.clone();
362 let id = prost.id;
363 let subscription_catalog = SubscriptionCatalog::from(prost);
364 let subscription_ref = Arc::new(subscription_catalog);
365
366 self.subscription_by_name
367 .try_insert(name, subscription_ref.clone())
368 .unwrap();
369 self.subscription_by_id
370 .try_insert(id, subscription_ref)
371 .unwrap();
372 }
373
374 pub fn drop_subscription(&mut self, id: SubscriptionId) {
375 let subscription_ref = self.subscription_by_id.remove(&id);
376 if let Some(subscription_ref) = subscription_ref {
377 self.subscription_by_name.remove(&subscription_ref.name);
378 }
379 }
380
381 pub fn update_subscription(&mut self, prost: &PbSubscription) {
382 let name = prost.name.clone();
383 let id = prost.id;
384 let subscription = SubscriptionCatalog::from(prost);
385 let subscription_ref = Arc::new(subscription);
386
387 let old_subscription = self.subscription_by_id.get(&id).unwrap();
388 if old_subscription.name != name
390 && let Some(s) = self.subscription_by_name.get(&old_subscription.name)
391 && s.id.subscription_id == id
392 {
393 self.subscription_by_name.remove(&old_subscription.name);
394 }
395
396 self.subscription_by_name
397 .insert(name, subscription_ref.clone());
398 self.subscription_by_id.insert(id, subscription_ref);
399 }
400
401 pub fn create_view(&mut self, prost: &PbView) {
402 let name = prost.name.clone();
403 let id = prost.id;
404 let view = ViewCatalog::from(prost);
405 let view_ref = Arc::new(view);
406
407 self.view_by_name
408 .try_insert(name, view_ref.clone())
409 .unwrap();
410 self.view_by_id.try_insert(id, view_ref).unwrap();
411 }
412
413 pub fn drop_view(&mut self, id: ViewId) {
414 let view_ref = self.view_by_id.remove(&id).unwrap();
415 self.view_by_name.remove(&view_ref.name).unwrap();
416 }
417
418 pub fn update_view(&mut self, prost: &PbView) {
419 let name = prost.name.clone();
420 let id = prost.id;
421 let view = ViewCatalog::from(prost);
422 let view_ref = Arc::new(view);
423
424 let old_view = self.view_by_id.get(&id).unwrap();
425 if old_view.name != name
427 && let Some(v) = self.view_by_name.get(old_view.name())
428 && v.id == id
429 {
430 self.view_by_name.remove(&old_view.name);
431 }
432
433 self.view_by_name.insert(name, view_ref.clone());
434 self.view_by_id.insert(id, view_ref);
435 }
436
437 pub fn get_func_sign(func: &FunctionCatalog) -> FuncSign {
438 FuncSign {
439 name: FuncName::Udf(func.name.clone()),
440 inputs_type: func
441 .arg_types
442 .iter()
443 .map(|t| t.clone().into())
444 .collect_vec(),
445 variadic: false,
446 ret_type: func.return_type.clone().into(),
447 build: FuncBuilder::Udf,
448 type_infer: |_| Ok(DataType::Boolean),
450 deprecated: false,
451 }
452 }
453
454 pub fn create_function(&mut self, prost: &PbFunction) {
455 let name = prost.name.clone();
456 let id = prost.id;
457 let function = FunctionCatalog::from(prost);
458 let args = function.arg_types.clone();
459 let function_ref = Arc::new(function);
460
461 self.function_registry
462 .insert(Self::get_func_sign(&function_ref));
463 self.function_by_name
464 .entry(name)
465 .or_default()
466 .try_insert(args, function_ref.clone())
467 .expect("function already exists with same argument types");
468 self.function_by_id
469 .try_insert(id.into(), function_ref)
470 .expect("function id exists");
471 }
472
473 pub fn drop_function(&mut self, id: FunctionId) {
474 let function_ref = self
475 .function_by_id
476 .remove(&id)
477 .expect("function not found by id");
478
479 self.function_registry
480 .remove(Self::get_func_sign(&function_ref))
481 .expect("function not found in registry");
482
483 self.function_by_name
484 .get_mut(&function_ref.name)
485 .expect("function not found by name")
486 .remove(&function_ref.arg_types)
487 .expect("function not found by argument types");
488 }
489
490 pub fn update_function(&mut self, prost: &PbFunction) {
491 let name = prost.name.clone();
492 let id = prost.id.into();
493 let function = FunctionCatalog::from(prost);
494 let function_ref = Arc::new(function);
495
496 let old_function_by_id = self.function_by_id.get(&id).unwrap();
497 let old_function_by_name = self
498 .function_by_name
499 .get_mut(&old_function_by_id.name)
500 .unwrap();
501 if old_function_by_id.name != name
503 && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
504 && f.id == id
505 {
506 old_function_by_name.remove(&old_function_by_id.arg_types);
507 if old_function_by_name.is_empty() {
508 self.function_by_name.remove(&old_function_by_id.name);
509 }
510 }
511
512 self.function_by_name
513 .entry(name)
514 .or_default()
515 .insert(old_function_by_id.arg_types.clone(), function_ref.clone());
516 self.function_by_id.insert(id, function_ref);
517 }
518
519 pub fn create_connection(&mut self, prost: &PbConnection) {
520 let name = prost.name.clone();
521 let id = prost.id;
522 let connection = ConnectionCatalog::from(prost);
523 let connection_ref = Arc::new(connection);
524 self.connection_by_name
525 .try_insert(name, connection_ref.clone())
526 .unwrap();
527 self.connection_by_id
528 .try_insert(id, connection_ref)
529 .unwrap();
530 }
531
532 pub fn update_connection(&mut self, prost: &PbConnection) {
533 let name = prost.name.clone();
534 let id = prost.id;
535 let connection = ConnectionCatalog::from(prost);
536 let connection_ref = Arc::new(connection);
537
538 let old_connection = self.connection_by_id.get(&id).unwrap();
539 if old_connection.name != name
541 && let Some(conn) = self.connection_by_name.get(&old_connection.name)
542 && conn.id == id
543 {
544 self.connection_by_name.remove(&old_connection.name);
545 }
546
547 self.connection_by_name.insert(name, connection_ref.clone());
548 self.connection_by_id.insert(id, connection_ref);
549 }
550
551 pub fn drop_connection(&mut self, connection_id: ConnectionId) {
552 let connection_ref = self
553 .connection_by_id
554 .remove(&connection_id)
555 .expect("connection not found by id");
556 self.connection_by_name
557 .remove(&connection_ref.name)
558 .expect("connection not found by name");
559 }
560
561 pub fn create_secret(&mut self, prost: &PbSecret) {
562 let name = prost.name.clone();
563 let id = SecretId::new(prost.id);
564 let secret = SecretCatalog::from(prost);
565 let secret_ref = Arc::new(secret);
566
567 self.secret_by_id
568 .try_insert(id, secret_ref.clone())
569 .unwrap();
570 self.secret_by_name
571 .try_insert(name, secret_ref.clone())
572 .unwrap();
573 }
574
575 pub fn update_secret(&mut self, prost: &PbSecret) {
576 let name = prost.name.clone();
577 let id = SecretId::new(prost.id);
578 let secret = SecretCatalog::from(prost);
579 let secret_ref = Arc::new(secret);
580
581 let old_secret = self.secret_by_id.get(&id).unwrap();
582 if old_secret.name != name
584 && let Some(s) = self.secret_by_name.get(&old_secret.name)
585 && s.id == id
586 {
587 self.secret_by_name.remove(&old_secret.name);
588 }
589
590 self.secret_by_name.insert(name, secret_ref.clone());
591 self.secret_by_id.insert(id, secret_ref);
592 }
593
594 pub fn drop_secret(&mut self, secret_id: SecretId) {
595 let secret_ref = self
596 .secret_by_id
597 .remove(&secret_id)
598 .expect("secret not found by id");
599 self.secret_by_name
600 .remove(&secret_ref.name)
601 .expect("secret not found by name");
602 }
603
604 pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
605 self.table_by_name.values()
606 }
607
608 pub fn iter_user_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
609 self.table_by_name.values().filter(|v| v.is_user_table())
610 }
611
612 pub fn iter_user_table_with_acl<'a>(
613 &'a self,
614 user: &'a UserCatalog,
615 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
616 self.table_by_name.values().filter(|v| {
617 v.is_user_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
618 })
619 }
620
621 pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
622 self.table_by_name
623 .values()
624 .filter(|v| v.is_internal_table())
625 }
626
627 pub fn iter_internal_table_with_acl<'a>(
628 &'a self,
629 user: &'a UserCatalog,
630 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
631 self.table_by_name.values().filter(|v| {
632 v.is_internal_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
633 })
634 }
635
636 pub fn iter_table_mv_indices(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
638 self.table_by_name
639 .values()
640 .filter(|v| !v.is_internal_table())
641 }
642
643 pub fn iter_table_mv_indices_with_acl<'a>(
644 &'a self,
645 user: &'a UserCatalog,
646 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
647 self.table_by_name.values().filter(|v| {
648 !v.is_internal_table() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
649 })
650 }
651
652 pub fn iter_all_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
654 self.table_by_name.values().filter(|v| v.is_mview())
655 }
656
657 pub fn iter_all_mvs_with_acl<'a>(
658 &'a self,
659 user: &'a UserCatalog,
660 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
661 self.table_by_name.values().filter(|v| {
662 v.is_mview() && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
663 })
664 }
665
666 pub fn iter_created_mvs(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
668 self.table_by_name
669 .values()
670 .filter(|v| v.is_mview() && v.is_created())
671 }
672
673 pub fn iter_created_mvs_with_acl<'a>(
674 &'a self,
675 user: &'a UserCatalog,
676 ) -> impl Iterator<Item = &'a Arc<TableCatalog>> {
677 self.table_by_name.values().filter(|v| {
678 v.is_mview()
679 && v.is_created()
680 && has_access_to_object(user, &self.name, v.id.table_id, v.owner)
681 })
682 }
683
684 pub fn iter_index(&self) -> impl Iterator<Item = &Arc<IndexCatalog>> {
686 self.index_by_name.values()
687 }
688
689 pub fn iter_index_with_acl<'a>(
690 &'a self,
691 user: &'a UserCatalog,
692 ) -> impl Iterator<Item = &'a Arc<IndexCatalog>> {
693 self.index_by_name
694 .values()
695 .filter(|idx| has_access_to_object(user, &self.name, idx.id.index_id, idx.owner()))
696 }
697
698 pub fn iter_source(&self) -> impl Iterator<Item = &Arc<SourceCatalog>> {
700 self.source_by_name.values()
701 }
702
703 pub fn iter_source_with_acl<'a>(
704 &'a self,
705 user: &'a UserCatalog,
706 ) -> impl Iterator<Item = &'a Arc<SourceCatalog>> {
707 self.source_by_name
708 .values()
709 .filter(|s| has_access_to_object(user, &self.name, s.id, s.owner))
710 }
711
712 pub fn iter_sink(&self) -> impl Iterator<Item = &Arc<SinkCatalog>> {
713 self.sink_by_name.values()
714 }
715
716 pub fn iter_sink_with_acl<'a>(
717 &'a self,
718 user: &'a UserCatalog,
719 ) -> impl Iterator<Item = &'a Arc<SinkCatalog>> {
720 self.sink_by_name
721 .values()
722 .filter(|s| has_access_to_object(user, &self.name, s.id.sink_id, s.owner.user_id))
723 }
724
725 pub fn iter_subscription(&self) -> impl Iterator<Item = &Arc<SubscriptionCatalog>> {
726 self.subscription_by_name.values()
727 }
728
729 pub fn iter_subscription_with_acl<'a>(
730 &'a self,
731 user: &'a UserCatalog,
732 ) -> impl Iterator<Item = &'a Arc<SubscriptionCatalog>> {
733 self.subscription_by_name.values().filter(|s| {
734 has_access_to_object(user, &self.name, s.id.subscription_id, s.owner.user_id)
735 })
736 }
737
738 pub fn iter_view(&self) -> impl Iterator<Item = &Arc<ViewCatalog>> {
739 self.view_by_name.values()
740 }
741
742 pub fn iter_view_with_acl<'a>(
743 &'a self,
744 user: &'a UserCatalog,
745 ) -> impl Iterator<Item = &'a Arc<ViewCatalog>> {
746 self.view_by_name
747 .values()
748 .filter(|v| v.is_system_view() || has_access_to_object(user, &self.name, v.id, v.owner))
749 }
750
751 pub fn iter_function(&self) -> impl Iterator<Item = &Arc<FunctionCatalog>> {
752 self.function_by_name.values().flat_map(|v| v.values())
753 }
754
755 pub fn iter_connections(&self) -> impl Iterator<Item = &Arc<ConnectionCatalog>> {
756 self.connection_by_name.values()
757 }
758
759 pub fn iter_secret(&self) -> impl Iterator<Item = &Arc<SecretCatalog>> {
760 self.secret_by_name.values()
761 }
762
763 pub fn iter_system_tables(&self) -> impl Iterator<Item = &Arc<SystemTableCatalog>> {
764 self.system_table_by_name.values()
765 }
766
767 pub fn get_table_by_name(
768 &self,
769 table_name: &str,
770 bind_creating_relations: bool,
771 ) -> Option<&Arc<TableCatalog>> {
772 self.table_by_name
773 .get(table_name)
774 .filter(|&table| bind_creating_relations || table.is_created())
775 }
776
777 pub fn get_any_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
778 self.get_table_by_name(table_name, true)
779 }
780
781 pub fn get_created_table_by_name(&self, table_name: &str) -> Option<&Arc<TableCatalog>> {
782 self.get_table_by_name(table_name, false)
783 }
784
785 pub fn get_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
786 self.table_by_id.get(table_id)
787 }
788
789 pub fn get_created_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
790 self.table_by_id
791 .get(table_id)
792 .filter(|&table| table.stream_job_status == StreamJobStatus::Created)
793 }
794
795 pub fn get_view_by_name(&self, view_name: &str) -> Option<&Arc<ViewCatalog>> {
796 self.view_by_name.get(view_name)
797 }
798
799 pub fn get_view_by_id(&self, view_id: &ViewId) -> Option<&Arc<ViewCatalog>> {
800 self.view_by_id.get(view_id)
801 }
802
803 pub fn get_source_by_name(&self, source_name: &str) -> Option<&Arc<SourceCatalog>> {
804 self.source_by_name.get(source_name)
805 }
806
807 pub fn get_source_by_id(&self, source_id: &SourceId) -> Option<&Arc<SourceCatalog>> {
808 self.source_by_id.get(source_id)
809 }
810
811 pub fn get_sink_by_name(
812 &self,
813 sink_name: &str,
814 bind_creating: bool,
815 ) -> Option<&Arc<SinkCatalog>> {
816 self.sink_by_name
817 .get(sink_name)
818 .filter(|s| bind_creating || s.is_created())
819 }
820
821 pub fn get_any_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
822 self.get_sink_by_name(sink_name, true)
823 }
824
825 pub fn get_created_sink_by_name(&self, sink_name: &str) -> Option<&Arc<SinkCatalog>> {
826 self.get_sink_by_name(sink_name, false)
827 }
828
829 pub fn get_sink_by_id(&self, sink_id: &SinkId) -> Option<&Arc<SinkCatalog>> {
830 self.sink_by_id.get(sink_id)
831 }
832
833 pub fn get_subscription_by_name(
834 &self,
835 subscription_name: &str,
836 ) -> Option<&Arc<SubscriptionCatalog>> {
837 self.subscription_by_name.get(subscription_name)
838 }
839
840 pub fn get_subscription_by_id(
841 &self,
842 subscription_id: &SubscriptionId,
843 ) -> Option<&Arc<SubscriptionCatalog>> {
844 self.subscription_by_id.get(subscription_id)
845 }
846
847 pub fn get_index_by_name(
848 &self,
849 index_name: &str,
850 bind_creating: bool,
851 ) -> Option<&Arc<IndexCatalog>> {
852 self.index_by_name
853 .get(index_name)
854 .filter(|i| bind_creating || i.is_created())
855 }
856
857 pub fn get_any_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
858 self.get_index_by_name(index_name, true)
859 }
860
861 pub fn get_created_index_by_name(&self, index_name: &str) -> Option<&Arc<IndexCatalog>> {
862 self.get_index_by_name(index_name, false)
863 }
864
865 pub fn get_index_by_id(&self, index_id: &IndexId) -> Option<&Arc<IndexCatalog>> {
866 self.index_by_id.get(index_id)
867 }
868
869 pub fn get_indexes_by_table_id(
870 &self,
871 table_id: &TableId,
872 include_creating: bool,
873 ) -> Vec<Arc<IndexCatalog>> {
874 self.indexes_by_table_id
875 .get(table_id)
876 .cloned()
877 .unwrap_or_default()
878 .into_iter()
879 .filter(|i| include_creating || i.is_created())
880 .collect()
881 }
882
883 pub fn get_any_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
884 self.get_indexes_by_table_id(table_id, true)
885 }
886
887 pub fn get_created_indexes_by_table_id(&self, table_id: &TableId) -> Vec<Arc<IndexCatalog>> {
888 self.get_indexes_by_table_id(table_id, false)
889 }
890
891 pub fn get_system_table_by_name(&self, table_name: &str) -> Option<&Arc<SystemTableCatalog>> {
892 self.system_table_by_name.get(table_name)
893 }
894
895 pub fn get_table_name_by_id(&self, table_id: TableId) -> Option<String> {
896 self.table_by_id
897 .get(&table_id)
898 .map(|table| table.name.clone())
899 }
900
901 pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc<FunctionCatalog>> {
902 self.function_by_id.get(&function_id)
903 }
904
905 pub fn get_function_by_name_inputs(
906 &self,
907 name: &str,
908 inputs: &mut [ExprImpl],
909 ) -> Option<&Arc<FunctionCatalog>> {
910 infer_type_with_sigmap(
911 FuncName::Udf(name.to_owned()),
912 inputs,
913 &self.function_registry,
914 )
915 .ok()?;
916 let args = inputs.iter().map(|x| x.return_type()).collect_vec();
917 self.function_by_name.get(name)?.get(&args)
918 }
919
920 pub fn get_function_by_name_args(
921 &self,
922 name: &str,
923 args: &[DataType],
924 ) -> Option<&Arc<FunctionCatalog>> {
925 let args = args.iter().map(|x| Some(x.clone())).collect_vec();
926 let func = infer_type_name(
927 &self.function_registry,
928 FuncName::Udf(name.to_owned()),
929 &args,
930 )
931 .ok()?;
932
933 let args = func
934 .inputs_type
935 .iter()
936 .filter_map(|x| {
937 if let SigDataType::Exact(t) = x {
938 Some(t.clone())
939 } else {
940 None
941 }
942 })
943 .collect_vec();
944
945 self.function_by_name.get(name)?.get(&args)
946 }
947
948 pub fn get_functions_by_name(&self, name: &str) -> Option<Vec<&Arc<FunctionCatalog>>> {
949 let functions = self.function_by_name.get(name)?;
950 if functions.is_empty() {
951 return None;
952 }
953 Some(functions.values().collect())
954 }
955
956 pub fn get_connection_by_id(
957 &self,
958 connection_id: &ConnectionId,
959 ) -> Option<&Arc<ConnectionCatalog>> {
960 self.connection_by_id.get(connection_id)
961 }
962
963 pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc<ConnectionCatalog>> {
964 self.connection_by_name.get(connection_name)
965 }
966
967 pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc<SecretCatalog>> {
968 self.secret_by_name.get(secret_name)
969 }
970
971 pub fn get_secret_by_id(&self, secret_id: &SecretId) -> Option<&Arc<SecretCatalog>> {
972 self.secret_by_id.get(secret_id)
973 }
974
975 pub fn get_source_ids_by_connection(
977 &self,
978 connection_id: ConnectionId,
979 ) -> Option<Vec<SourceId>> {
980 self.connection_source_ref
981 .get(&connection_id)
982 .map(|c| c.to_owned())
983 }
984
985 pub fn get_sink_ids_by_connection(&self, connection_id: ConnectionId) -> Option<Vec<SinkId>> {
987 self.connection_sink_ref
988 .get(&connection_id)
989 .map(|s| s.to_owned())
990 }
991
992 pub fn get_grant_object_by_oid(&self, oid: u32) -> Option<OwnedGrantObject> {
993 #[allow(clippy::manual_map)]
994 if let Some(table) = self.get_created_table_by_id(&TableId::new(oid)) {
995 Some(OwnedGrantObject {
996 owner: table.owner,
997 object: Object::TableId(oid),
998 })
999 } else if let Some(index) = self.get_index_by_id(&IndexId::new(oid)) {
1000 Some(OwnedGrantObject {
1001 owner: index.owner(),
1002 object: Object::TableId(oid),
1003 })
1004 } else if let Some(source) = self.get_source_by_id(&oid) {
1005 Some(OwnedGrantObject {
1006 owner: source.owner,
1007 object: Object::SourceId(oid),
1008 })
1009 } else if let Some(sink) = self.get_sink_by_id(&oid) {
1010 Some(OwnedGrantObject {
1011 owner: sink.owner.user_id,
1012 object: Object::SinkId(oid),
1013 })
1014 } else if let Some(view) = self.get_view_by_id(&oid) {
1015 Some(OwnedGrantObject {
1016 owner: view.owner,
1017 object: Object::ViewId(oid),
1018 })
1019 } else if let Some(function) = self.get_function_by_id(FunctionId::new(oid)) {
1020 Some(OwnedGrantObject {
1021 owner: function.owner(),
1022 object: Object::FunctionId(oid),
1023 })
1024 } else if let Some(subscription) = self.get_subscription_by_id(&oid) {
1025 Some(OwnedGrantObject {
1026 owner: subscription.owner.user_id,
1027 object: Object::SubscriptionId(oid),
1028 })
1029 } else if let Some(connection) = self.get_connection_by_id(&oid) {
1030 Some(OwnedGrantObject {
1031 owner: connection.owner,
1032 object: Object::ConnectionId(oid),
1033 })
1034 } else if let Some(secret) = self.get_secret_by_id(&SecretId::new(oid)) {
1035 Some(OwnedGrantObject {
1036 owner: secret.owner,
1037 object: Object::SecretId(oid),
1038 })
1039 } else {
1040 None
1041 }
1042 }
1043
1044 pub fn contains_object(&self, oid: u32) -> bool {
1045 self.table_by_id.contains_key(&TableId::new(oid))
1046 || self.index_by_id.contains_key(&IndexId::new(oid))
1047 || self.source_by_id.contains_key(&oid)
1048 || self.sink_by_id.contains_key(&oid)
1049 || self.view_by_id.contains_key(&oid)
1050 || self.function_by_id.contains_key(&FunctionId::new(oid))
1051 || self.subscription_by_id.contains_key(&oid)
1052 || self.connection_by_id.contains_key(&oid)
1053 }
1054
1055 pub fn id(&self) -> SchemaId {
1056 self.id
1057 }
1058
1059 pub fn database_id(&self) -> DatabaseId {
1060 self.database_id
1061 }
1062
1063 pub fn name(&self) -> String {
1064 self.name.clone()
1065 }
1066}
1067
1068impl OwnedByUserCatalog for SchemaCatalog {
1069 fn owner(&self) -> UserId {
1070 self.owner
1071 }
1072}
1073
1074impl From<&PbSchema> for SchemaCatalog {
1075 fn from(schema: &PbSchema) -> Self {
1076 Self {
1077 id: schema.id,
1078 owner: schema.owner,
1079 name: schema.name.clone(),
1080 database_id: schema.database_id,
1081 table_by_name: HashMap::new(),
1082 table_by_id: HashMap::new(),
1083 source_by_name: HashMap::new(),
1084 source_by_id: HashMap::new(),
1085 sink_by_name: HashMap::new(),
1086 sink_by_id: HashMap::new(),
1087 table_incoming_sinks: HashMap::new(),
1088 index_by_name: HashMap::new(),
1089 index_by_id: HashMap::new(),
1090 indexes_by_table_id: HashMap::new(),
1091 system_table_by_name: HashMap::new(),
1092 view_by_name: HashMap::new(),
1093 view_by_id: HashMap::new(),
1094 function_registry: FunctionRegistry::default(),
1095 function_by_name: HashMap::new(),
1096 function_by_id: HashMap::new(),
1097 connection_by_name: HashMap::new(),
1098 connection_by_id: HashMap::new(),
1099 secret_by_name: HashMap::new(),
1100 secret_by_id: HashMap::new(),
1101 _secret_source_ref: HashMap::new(),
1102 _secret_sink_ref: HashMap::new(),
1103 connection_source_ref: HashMap::new(),
1104 connection_sink_ref: HashMap::new(),
1105 subscription_by_name: HashMap::new(),
1106 subscription_by_id: HashMap::new(),
1107 }
1108 }
1109}