1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::id::ObjectId;
21use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
22use risingwave_common::types::DataType;
23use risingwave_connector::sink::catalog::SinkCatalog;
24use risingwave_pb::catalog::{
25 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
26 PbSubscription, PbTable, PbView,
27};
28use risingwave_pb::common::PbObjectType;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::meta::ObjectDependency as PbObjectDependency;
31
32use super::function_catalog::FunctionCatalog;
33use super::source_catalog::SourceCatalog;
34use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
35use super::view_catalog::ViewCatalog;
36use super::{
37 CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
38};
39use crate::catalog::connection_catalog::ConnectionCatalog;
40use crate::catalog::database_catalog::DatabaseCatalog;
41use crate::catalog::schema_catalog::SchemaCatalog;
42use crate::catalog::secret_catalog::SecretCatalog;
43use crate::catalog::system_catalog::{
44 SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
45};
46use crate::catalog::table_catalog::TableCatalog;
47use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
48use crate::expr::{Expr, ExprImpl};
49
50#[derive(Copy, Clone)]
51pub enum SchemaPath<'a> {
52 Name(&'a str),
53 Path(&'a SearchPath, &'a str),
55}
56
57impl<'a> SchemaPath<'a> {
58 pub fn new(
59 schema_name: Option<&'a str>,
60 search_path: &'a SearchPath,
61 user_name: &'a str,
62 ) -> Self {
63 match schema_name {
64 Some(schema_name) => SchemaPath::Name(schema_name),
65 None => SchemaPath::Path(search_path, user_name),
66 }
67 }
68
69 pub fn try_find<T, E>(
71 &self,
72 mut f: impl FnMut(&str) -> Result<Option<T>, E>,
73 ) -> Result<Option<(T, &'a str)>, E> {
74 match self {
75 SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
76 SchemaPath::Path(search_path, user_name) => {
77 for schema_name in search_path.path() {
78 let mut schema_name: &str = schema_name;
79 if schema_name == USER_NAME_WILD_CARD {
80 schema_name = user_name;
81 }
82 if let Ok(Some(res)) = f(schema_name) {
83 return Ok(Some((res, schema_name)));
84 }
85 }
86 Ok(None)
87 }
88 }
89 }
90}
91
92#[derive(Clone, Debug)]
93pub struct ObjectDependency {
94 pub object_id: ObjectId,
95 pub referenced_object_id: ObjectId,
96 pub referenced_object_type: PbObjectType,
97}
98
99impl From<PbObjectDependency> for ObjectDependency {
100 fn from(dependency: PbObjectDependency) -> Self {
101 let referenced_object_type = PbObjectType::try_from(dependency.referenced_object_type)
102 .unwrap_or(PbObjectType::Unspecified);
103 Self {
104 object_id: dependency.object_id,
105 referenced_object_id: dependency.referenced_object_id,
106 referenced_object_type,
107 }
108 }
109}
110
111pub struct Catalog {
123 database_by_name: HashMap<String, DatabaseCatalog>,
124 db_name_by_id: HashMap<DatabaseId, String>,
125 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
127 table_stats: HummockVersionStats,
128 object_dependencies: HashMap<ObjectId, Vec<ObjectDependency>>,
129}
130
131#[expect(clippy::derivable_impls)]
132impl Default for Catalog {
133 fn default() -> Self {
134 Self {
135 database_by_name: HashMap::new(),
136 db_name_by_id: HashMap::new(),
137 table_by_id: HashMap::new(),
138 table_stats: HummockVersionStats::default(),
139 object_dependencies: HashMap::new(),
140 }
141 }
142}
143
144impl Catalog {
145 fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
146 let name = self.db_name_by_id.get(&db_id)?;
147 self.database_by_name.get_mut(name)
148 }
149
150 pub fn clear(&mut self) {
151 self.database_by_name.clear();
152 self.db_name_by_id.clear();
153 self.table_by_id.clear();
154 self.object_dependencies.clear();
155 }
156
157 pub fn create_database(&mut self, db: &PbDatabase) {
158 let name = db.name.clone();
159 let id = db.id;
160
161 self.database_by_name
162 .try_insert(name.clone(), db.into())
163 .unwrap();
164 self.db_name_by_id.try_insert(id, name).unwrap();
165 }
166
167 pub fn create_schema(&mut self, proto: &PbSchema) {
168 let database_id = proto.database_id;
169 let id = proto.id;
170 self.get_database_mut(database_id)
171 .unwrap()
172 .create_schema(proto);
173
174 for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
175 self.get_database_mut(database_id)
176 .unwrap()
177 .get_schema_mut(id)
178 .unwrap()
179 .create_sys_table(sys_table);
180 }
181 for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
182 sys_view.database_id = database_id;
183 sys_view.schema_id = id;
184 self.get_database_mut(database_id)
185 .unwrap()
186 .get_schema_mut(id)
187 .unwrap()
188 .create_sys_view(Arc::new(sys_view));
189 }
190 }
191
192 pub fn create_table(&mut self, proto: &PbTable) {
193 let table = self
194 .get_database_mut(proto.database_id)
195 .unwrap()
196 .get_schema_mut(proto.schema_id)
197 .unwrap()
198 .create_table(proto);
199 self.table_by_id.insert(proto.id, table);
200 }
201
202 pub fn create_index(&mut self, proto: &PbIndex) {
203 self.get_database_mut(proto.database_id)
204 .unwrap()
205 .get_schema_mut(proto.schema_id)
206 .unwrap()
207 .create_index(proto);
208 }
209
210 pub fn create_source(&mut self, proto: &PbSource) {
211 self.get_database_mut(proto.database_id)
212 .unwrap()
213 .get_schema_mut(proto.schema_id)
214 .unwrap()
215 .create_source(proto);
216 }
217
218 pub fn create_sink(&mut self, proto: &PbSink) {
219 self.get_database_mut(proto.database_id)
220 .unwrap()
221 .get_schema_mut(proto.schema_id)
222 .unwrap()
223 .create_sink(proto);
224 }
225
226 pub fn create_subscription(&mut self, proto: &PbSubscription) {
227 self.get_database_mut(proto.database_id)
228 .unwrap()
229 .get_schema_mut(proto.schema_id)
230 .unwrap()
231 .create_subscription(proto);
232 }
233
234 pub fn create_secret(&mut self, proto: &PbSecret) {
235 self.get_database_mut(proto.database_id)
236 .unwrap()
237 .get_schema_mut(proto.schema_id)
238 .unwrap()
239 .create_secret(proto);
240 }
241
242 pub fn create_view(&mut self, proto: &PbView) {
243 self.get_database_mut(proto.database_id)
244 .unwrap()
245 .get_schema_mut(proto.schema_id)
246 .unwrap()
247 .create_view(proto);
248 }
249
250 pub fn create_function(&mut self, proto: &PbFunction) {
251 self.get_database_mut(proto.database_id)
252 .unwrap()
253 .get_schema_mut(proto.schema_id)
254 .unwrap()
255 .create_function(proto);
256 }
257
258 pub fn create_connection(&mut self, proto: &PbConnection) {
259 self.get_database_mut(proto.database_id)
260 .unwrap()
261 .get_schema_mut(proto.schema_id)
262 .unwrap()
263 .create_connection(proto);
264 }
265
266 pub fn drop_connection(
267 &mut self,
268 db_id: DatabaseId,
269 schema_id: SchemaId,
270 connection_id: ConnectionId,
271 ) {
272 self.get_database_mut(db_id)
273 .unwrap()
274 .get_schema_mut(schema_id)
275 .unwrap()
276 .drop_connection(connection_id);
277 }
278
279 pub fn update_connection(&mut self, proto: &PbConnection) {
280 let database = self.get_database_mut(proto.database_id).unwrap();
281 let schema = database.get_schema_mut(proto.schema_id).unwrap();
282 if schema.get_connection_by_id(proto.id).is_some() {
283 schema.update_connection(proto);
284 } else {
285 schema.create_connection(proto);
287 database
288 .iter_schemas_mut()
289 .find(|schema| {
290 schema.id() != proto.schema_id
291 && schema.get_connection_by_id(proto.id).is_some()
292 })
293 .unwrap()
294 .drop_connection(proto.id);
295 }
296 }
297
298 pub fn update_secret(&mut self, proto: &PbSecret) {
299 let database = self.get_database_mut(proto.database_id).unwrap();
300 let schema = database.get_schema_mut(proto.schema_id).unwrap();
301 let secret_id = proto.id;
302 if schema.get_secret_by_id(secret_id).is_some() {
303 schema.update_secret(proto);
304 } else {
305 schema.create_secret(proto);
307 database
308 .iter_schemas_mut()
309 .find(|schema| {
310 schema.id() != proto.schema_id && schema.get_secret_by_id(secret_id).is_some()
311 })
312 .unwrap()
313 .drop_secret(secret_id);
314 }
315 }
316
317 pub fn drop_database(&mut self, db_id: DatabaseId) {
318 let name = self.db_name_by_id.remove(&db_id).unwrap();
319 let database = self.database_by_name.remove(&name).unwrap();
320 let object_ids: HashSet<ObjectId> = database.iter_object_ids().collect();
321 self.remove_object_dependencies_for_objects(&object_ids);
322 database.iter_all_table_ids().for_each(|table| {
323 self.table_by_id.remove(&table);
324 });
325 }
326
327 pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
328 let object_ids = self
329 .get_database_by_id(db_id)
330 .ok()
331 .and_then(|db| db.get_schema_by_id(schema_id))
332 .map(|schema| schema.iter_object_ids().collect::<HashSet<_>>())
333 .unwrap_or_default();
334 self.remove_object_dependencies_for_objects(&object_ids);
335 self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
336 }
337
338 pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
339 self.remove_object_dependencies_for_object(tb_id.as_object_id());
340 self.table_by_id.remove(&tb_id);
341 self.get_database_mut(db_id)
342 .unwrap()
343 .get_schema_mut(schema_id)
344 .unwrap()
345 .drop_table(tb_id);
346 }
347
348 pub fn update_table(&mut self, proto: &PbTable) {
349 let database = self.get_database_mut(proto.database_id).unwrap();
350 let schema = database.get_schema_mut(proto.schema_id).unwrap();
351 let table = if schema.get_table_by_id(proto.id).is_some() {
352 schema.update_table(proto)
353 } else {
354 let new_table = schema.create_table(proto);
356 database
357 .iter_schemas_mut()
358 .find(|schema| {
359 schema.id() != proto.schema_id
360 && schema.get_created_table_by_id(proto.id).is_some()
361 })
362 .unwrap()
363 .drop_table(proto.id);
364 new_table
365 };
366
367 self.table_by_id.insert(proto.id, table);
368 }
369
370 pub fn update_database(&mut self, proto: &PbDatabase) {
371 let id = proto.id;
372 let name = proto.name.clone();
373
374 let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
375 if old_database_name != name {
376 let mut database = self.database_by_name.remove(&old_database_name).unwrap();
377 database.name.clone_from(&name);
378 database.owner = proto.owner;
379 self.database_by_name.insert(name.clone(), database);
380 self.db_name_by_id.insert(id, name);
381 } else {
382 let database = self.get_database_mut(id).unwrap();
383 database.name = name;
384 database.owner = proto.owner;
385 database.barrier_interval_ms = proto.barrier_interval_ms;
386 database.checkpoint_frequency = proto.checkpoint_frequency;
387 }
388 }
389
390 pub fn update_schema(&mut self, proto: &PbSchema) {
391 self.get_database_mut(proto.database_id)
392 .unwrap()
393 .update_schema(proto);
394 }
395
396 pub fn update_index(&mut self, proto: &PbIndex) {
397 let database = self.get_database_mut(proto.database_id).unwrap();
398 let schema = database.get_schema_mut(proto.schema_id).unwrap();
399 if schema.get_index_by_id(proto.id).is_some() {
400 schema.update_index(proto);
401 } else {
402 schema.create_index(proto);
404 database
405 .iter_schemas_mut()
406 .find(|schema| {
407 schema.id() != proto.schema_id && schema.get_index_by_id(proto.id).is_some()
408 })
409 .unwrap()
410 .drop_index(proto.id);
411 }
412 }
413
414 pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
415 self.remove_object_dependencies_for_object(source_id.as_object_id());
416 self.get_database_mut(db_id)
417 .unwrap()
418 .get_schema_mut(schema_id)
419 .unwrap()
420 .drop_source(source_id);
421 }
422
423 pub fn update_source(&mut self, proto: &PbSource) {
424 let database = self.get_database_mut(proto.database_id).unwrap();
425 let schema = database.get_schema_mut(proto.schema_id).unwrap();
426 if schema.get_source_by_id(proto.id).is_some() {
427 schema.update_source(proto);
428 } else {
429 schema.create_source(proto);
431 database
432 .iter_schemas_mut()
433 .find(|schema| {
434 schema.id() != proto.schema_id && schema.get_source_by_id(proto.id).is_some()
435 })
436 .unwrap()
437 .drop_source(proto.id);
438 }
439 }
440
441 pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
442 self.remove_object_dependencies_for_object(sink_id.as_object_id());
443 self.get_database_mut(db_id)
444 .unwrap()
445 .get_schema_mut(schema_id)
446 .unwrap()
447 .drop_sink(sink_id);
448 }
449
450 pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
451 self.remove_object_dependencies_for_object(secret_id.as_object_id());
452 self.get_database_mut(db_id)
453 .unwrap()
454 .get_schema_mut(schema_id)
455 .unwrap()
456 .drop_secret(secret_id);
457 }
458
459 pub fn update_sink(&mut self, proto: &PbSink) {
460 let database = self.get_database_mut(proto.database_id).unwrap();
461 let schema = database.get_schema_mut(proto.schema_id).unwrap();
462 if schema.get_sink_by_id(proto.id).is_some() {
463 schema.update_sink(proto);
464 } else {
465 schema.create_sink(proto);
467 database
468 .iter_schemas_mut()
469 .find(|schema| {
470 schema.id() != proto.schema_id && schema.get_sink_by_id(proto.id).is_some()
471 })
472 .unwrap()
473 .drop_sink(proto.id);
474 }
475 }
476
477 pub fn drop_subscription(
478 &mut self,
479 db_id: DatabaseId,
480 schema_id: SchemaId,
481 subscription_id: SubscriptionId,
482 ) {
483 self.remove_object_dependencies_for_object(subscription_id.as_object_id());
484 self.get_database_mut(db_id)
485 .unwrap()
486 .get_schema_mut(schema_id)
487 .unwrap()
488 .drop_subscription(subscription_id);
489 }
490
491 pub fn update_subscription(&mut self, proto: &PbSubscription) {
492 let database = self.get_database_mut(proto.database_id).unwrap();
493 let schema = database.get_schema_mut(proto.schema_id).unwrap();
494 if schema.get_subscription_by_id(proto.id).is_some() {
495 schema.update_subscription(proto);
496 } else {
497 schema.create_subscription(proto);
499 database
500 .iter_schemas_mut()
501 .find(|schema| {
502 schema.id() != proto.schema_id
503 && schema.get_subscription_by_id(proto.id).is_some()
504 })
505 .unwrap()
506 .drop_subscription(proto.id);
507 }
508 }
509
510 pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
511 self.remove_object_dependencies_for_object(index_id.as_object_id());
512 self.get_database_mut(db_id)
513 .unwrap()
514 .get_schema_mut(schema_id)
515 .unwrap()
516 .drop_index(index_id);
517 }
518
519 pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
520 self.remove_object_dependencies_for_object(view_id.as_object_id());
521 self.get_database_mut(db_id)
522 .unwrap()
523 .get_schema_mut(schema_id)
524 .unwrap()
525 .drop_view(view_id);
526 }
527
528 pub fn update_view(&mut self, proto: &PbView) {
529 let database = self.get_database_mut(proto.database_id).unwrap();
530 let schema = database.get_schema_mut(proto.schema_id).unwrap();
531 if schema.get_view_by_id(proto.id).is_some() {
532 schema.update_view(proto);
533 } else {
534 schema.create_view(proto);
536 database
537 .iter_schemas_mut()
538 .find(|schema| {
539 schema.id() != proto.schema_id && schema.get_view_by_id(proto.id).is_some()
540 })
541 .unwrap()
542 .drop_view(proto.id);
543 }
544 }
545
546 pub fn drop_function(
547 &mut self,
548 db_id: DatabaseId,
549 schema_id: SchemaId,
550 function_id: FunctionId,
551 ) {
552 self.remove_object_dependencies_for_object(function_id.as_object_id());
553 self.get_database_mut(db_id)
554 .unwrap()
555 .get_schema_mut(schema_id)
556 .unwrap()
557 .drop_function(function_id);
558 }
559
560 pub fn update_function(&mut self, proto: &PbFunction) {
561 let database = self.get_database_mut(proto.database_id).unwrap();
562 let schema = database.get_schema_mut(proto.schema_id).unwrap();
563 if schema.get_function_by_id(proto.id).is_some() {
564 schema.update_function(proto);
565 } else {
566 schema.create_function(proto);
568 database
569 .iter_schemas_mut()
570 .find(|schema| {
571 schema.id() != proto.schema_id && schema.get_function_by_id(proto.id).is_some()
572 })
573 .unwrap()
574 .drop_function(proto.id);
575 }
576
577 self.get_database_mut(proto.database_id)
578 .unwrap()
579 .get_schema_mut(proto.schema_id)
580 .unwrap()
581 .update_function(proto);
582 }
583
584 pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
585 self.database_by_name
586 .get(db_name)
587 .ok_or_else(|| CatalogError::not_found("database", db_name))
588 }
589
590 pub fn get_database_by_id(&self, db_id: DatabaseId) -> CatalogResult<&DatabaseCatalog> {
591 let db_name = self
592 .db_name_by_id
593 .get(&db_id)
594 .ok_or_else(|| CatalogError::not_found("db_id", db_id.to_string()))?;
595 self.database_by_name
596 .get(db_name)
597 .ok_or_else(|| CatalogError::not_found("database", db_name))
598 }
599
600 pub fn find_schema_secret_by_secret_id(
601 &self,
602 db_name: &str,
603 secret_id: SecretId,
604 ) -> CatalogResult<(String, String)> {
605 let db = self.get_database_by_name(db_name)?;
606 let schema_secret = db
607 .iter_schemas()
608 .find_map(|schema| {
609 schema
610 .get_secret_by_id(secret_id)
611 .map(|secret| (schema.name(), secret.name.clone()))
612 })
613 .ok_or_else(|| CatalogError::not_found("secret", secret_id.to_string()))?;
614 Ok(schema_secret)
615 }
616
617 pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
618 Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
619 }
620
621 pub fn iter_schemas(
622 &self,
623 db_name: &str,
624 ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
625 Ok(self.get_database_by_name(db_name)?.iter_schemas())
626 }
627
628 pub fn get_all_database_names(&self) -> Vec<String> {
629 self.database_by_name.keys().cloned().collect_vec()
630 }
631
632 pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
633 self.database_by_name.values()
634 }
635
636 pub fn get_schema_by_name(
637 &self,
638 db_name: &str,
639 schema_name: &str,
640 ) -> CatalogResult<&SchemaCatalog> {
641 self.get_database_by_name(db_name)?
642 .get_schema_by_name(schema_name)
643 .ok_or_else(|| CatalogError::not_found("schema", schema_name))
644 }
645
646 pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
647 self.get_any_table_by_id(table_id)
648 .map(|table| table.name.clone())
649 }
650
651 pub fn get_schema_by_id(
652 &self,
653 db_id: DatabaseId,
654 schema_id: SchemaId,
655 ) -> CatalogResult<&SchemaCatalog> {
656 self.get_database_by_id(db_id)?
657 .get_schema_by_id(schema_id)
658 .ok_or_else(|| CatalogError::not_found("schema_id", schema_id.to_string()))
659 }
660
661 pub fn first_valid_schema(
663 &self,
664 db_name: &str,
665 search_path: &SearchPath,
666 user_name: &str,
667 ) -> CatalogResult<&SchemaCatalog> {
668 for path in search_path.real_path() {
669 let mut schema_name: &str = path;
670 if schema_name == USER_NAME_WILD_CARD {
671 schema_name = user_name;
672 }
673
674 if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
675 return schema_catalog;
676 }
677 }
678 Err(CatalogError::not_found(
679 "first valid schema",
680 "no schema has been selected to create in".to_owned(),
681 ))
682 }
683
684 pub fn get_source_by_id<'a>(
685 &self,
686 db_name: &'a str,
687 schema_path: SchemaPath<'a>,
688 source_id: SourceId,
689 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
690 schema_path
691 .try_find(|schema_name| -> CatalogResult<_> {
692 Ok(self
693 .get_schema_by_name(db_name, schema_name)?
694 .get_source_by_id(source_id))
695 })?
696 .ok_or_else(|| CatalogError::not_found("source", source_id.to_string()))
697 }
698
699 pub fn get_table_by_name<'a>(
700 &self,
701 db_name: &str,
702 schema_path: SchemaPath<'a>,
703 table_name: &str,
704 bind_creating: bool,
705 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
706 schema_path
707 .try_find(|schema_name| -> CatalogResult<_> {
708 Ok(self
709 .get_schema_by_name(db_name, schema_name)?
710 .get_table_by_name(table_name, bind_creating))
711 })?
712 .ok_or_else(|| CatalogError::not_found("table", table_name))
713 }
714
715 pub fn get_any_table_by_name<'a>(
718 &self,
719 db_name: &str,
720 schema_path: SchemaPath<'a>,
721 table_name: &str,
722 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
723 self.get_table_by_name(db_name, schema_path, table_name, true)
724 }
725
726 pub fn get_created_table_by_name<'a>(
729 &self,
730 db_name: &str,
731 schema_path: SchemaPath<'a>,
732 table_name: &str,
733 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
734 self.get_table_by_name(db_name, schema_path, table_name, false)
735 }
736
737 pub fn get_any_table_by_id(&self, table_id: TableId) -> CatalogResult<&Arc<TableCatalog>> {
738 self.table_by_id
739 .get(&table_id)
740 .ok_or_else(|| CatalogError::not_found("table id", table_id.to_string()))
741 }
742
743 pub fn get_created_table_by_id_with_db(
745 &self,
746 db_name: &str,
747 table_id: u32,
748 ) -> CatalogResult<&Arc<TableCatalog>> {
749 let table_id = TableId::from(table_id);
750 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
751 if let Some(table) = schema.get_created_table_by_id(table_id) {
752 return Ok(table);
753 }
754 }
755 Err(CatalogError::not_found("table id", table_id.to_string()))
756 }
757
758 pub fn iter_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
759 self.table_by_id.values()
760 }
761
762 pub fn iter_backfilling_internal_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
763 self.table_by_id
764 .values()
765 .filter(|t| t.is_internal_table() && !t.is_created())
766 }
767
768 pub fn alter_table_name_by_id(&mut self, table_id: TableId, table_name: &str) {
770 let mut found = false;
771 for database in self.database_by_name.values() {
772 if !found {
773 for schema in database.iter_schemas() {
774 if schema.iter_user_table().any(|t| t.id() == table_id) {
775 found = true;
776 break;
777 }
778 }
779 }
780 }
781
782 if found {
783 let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
784 table.name = table_name.to_owned();
785 self.update_table(&table);
786 }
787 }
788
789 #[cfg(test)]
790 pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
791 self.table_by_id.insert(
792 table_id,
793 Arc::new(TableCatalog {
794 fragment_id,
795 ..Default::default()
796 }),
797 );
798 }
799
800 pub fn get_sys_table_by_name(
801 &self,
802 db_name: &str,
803 schema_name: &str,
804 table_name: &str,
805 ) -> CatalogResult<&Arc<SystemTableCatalog>> {
806 self.get_schema_by_name(db_name, schema_name)?
807 .get_system_table_by_name(table_name)
808 .ok_or_else(|| CatalogError::not_found("table", table_name))
809 }
810
811 pub fn get_source_by_name<'a>(
812 &self,
813 db_name: &str,
814 schema_path: SchemaPath<'a>,
815 source_name: &str,
816 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
817 schema_path
818 .try_find(|schema_name| -> CatalogResult<_> {
819 Ok(self
820 .get_schema_by_name(db_name, schema_name)?
821 .get_source_by_name(source_name))
822 })?
823 .ok_or_else(|| CatalogError::not_found("source", source_name))
824 }
825
826 pub fn get_sink_by_name<'a>(
827 &self,
828 db_name: &str,
829 schema_path: SchemaPath<'a>,
830 sink_name: &str,
831 bind_creating: bool,
832 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
833 schema_path
834 .try_find(|schema_name| -> CatalogResult<_> {
835 Ok(self
836 .get_schema_by_name(db_name, schema_name)?
837 .get_sink_by_name(sink_name, bind_creating))
838 })?
839 .ok_or_else(|| CatalogError::not_found("sink", sink_name))
840 }
841
842 pub fn get_any_sink_by_name<'a>(
843 &self,
844 db_name: &str,
845 schema_path: SchemaPath<'a>,
846 sink_name: &str,
847 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
848 self.get_sink_by_name(db_name, schema_path, sink_name, true)
849 }
850
851 pub fn get_created_sink_by_name<'a>(
852 &self,
853 db_name: &str,
854 schema_path: SchemaPath<'a>,
855 sink_name: &str,
856 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
857 self.get_sink_by_name(db_name, schema_path, sink_name, false)
858 }
859
860 pub fn get_subscription_by_name<'a>(
861 &self,
862 db_name: &str,
863 schema_path: SchemaPath<'a>,
864 subscription_name: &str,
865 ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
866 schema_path
867 .try_find(|schema_name| -> CatalogResult<_> {
868 Ok(self
869 .get_schema_by_name(db_name, schema_name)?
870 .get_subscription_by_name(subscription_name))
871 })?
872 .ok_or_else(|| CatalogError::not_found("subscription", subscription_name))
873 }
874
875 pub fn get_index_by_name<'a>(
876 &self,
877 db_name: &str,
878 schema_path: SchemaPath<'a>,
879 index_name: &str,
880 ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
881 schema_path
882 .try_find(|schema_name| -> CatalogResult<_> {
883 Ok(self
884 .get_schema_by_name(db_name, schema_name)?
885 .get_created_index_by_name(index_name))
886 })?
887 .ok_or_else(|| CatalogError::not_found("index", index_name))
888 }
889
890 pub fn get_any_index_by_name<'a>(
891 &self,
892 db_name: &str,
893 schema_path: SchemaPath<'a>,
894 index_name: &str,
895 ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
896 schema_path
897 .try_find(|schema_name| -> CatalogResult<_> {
898 Ok(self
899 .get_schema_by_name(db_name, schema_name)?
900 .get_any_index_by_name(index_name))
901 })?
902 .ok_or_else(|| CatalogError::not_found("index", index_name))
903 }
904
905 pub fn get_index_by_id(
906 &self,
907 db_name: &str,
908 index_id: u32,
909 ) -> CatalogResult<&Arc<IndexCatalog>> {
910 let index_id = IndexId::from(index_id);
911 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
912 if let Some(index) = schema.get_index_by_id(index_id) {
913 return Ok(index);
914 }
915 }
916 Err(CatalogError::not_found("index", index_id.to_string()))
917 }
918
919 pub fn get_view_by_name<'a>(
920 &self,
921 db_name: &str,
922 schema_path: SchemaPath<'a>,
923 view_name: &str,
924 ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
925 schema_path
926 .try_find(|schema_name| -> CatalogResult<_> {
927 Ok(self
928 .get_schema_by_name(db_name, schema_name)?
929 .get_view_by_name(view_name))
930 })?
931 .ok_or_else(|| CatalogError::not_found("view", view_name))
932 }
933
934 pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
935 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
936 if let Some(view) = schema.get_view_by_id(ViewId::from(view_id)) {
937 return Ok(view.clone());
938 }
939 }
940 Err(CatalogError::not_found("view", view_id.to_string()))
941 }
942
943 pub fn get_secret_by_name<'a>(
944 &self,
945 db_name: &str,
946 schema_path: SchemaPath<'a>,
947 secret_name: &str,
948 ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
949 schema_path
950 .try_find(|schema_name| -> CatalogResult<_> {
951 Ok(self
952 .get_schema_by_name(db_name, schema_name)?
953 .get_secret_by_name(secret_name))
954 })?
955 .ok_or_else(|| CatalogError::not_found("secret", secret_name))
956 }
957
958 pub fn get_connection_by_id(
959 &self,
960 db_name: &str,
961 connection_id: ConnectionId,
962 ) -> CatalogResult<&Arc<ConnectionCatalog>> {
963 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
964 if let Some(conn) = schema.get_connection_by_id(connection_id) {
965 return Ok(conn);
966 }
967 }
968 Err(CatalogError::not_found(
969 "connection",
970 connection_id.to_string(),
971 ))
972 }
973
974 pub fn get_connection_by_name<'a>(
975 &self,
976 db_name: &str,
977 schema_path: SchemaPath<'a>,
978 connection_name: &str,
979 ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
980 schema_path
981 .try_find(|schema_name| -> CatalogResult<_> {
982 Ok(self
983 .get_schema_by_name(db_name, schema_name)?
984 .get_connection_by_name(connection_name))
985 })?
986 .ok_or_else(|| CatalogError::not_found("connection", connection_name))
987 }
988
989 pub fn get_function_by_name_inputs<'a>(
990 &self,
991 db_name: &str,
992 schema_path: SchemaPath<'a>,
993 function_name: &str,
994 inputs: &mut [ExprImpl],
995 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
996 schema_path
997 .try_find(|schema_name| -> CatalogResult<_> {
998 Ok(self
999 .get_schema_by_name(db_name, schema_name)?
1000 .get_function_by_name_inputs(function_name, inputs))
1001 })?
1002 .ok_or_else(|| {
1003 CatalogError::not_found(
1004 "function",
1005 format!(
1006 "{}({})",
1007 function_name,
1008 inputs
1009 .iter()
1010 .map(|a| a.return_type().to_string())
1011 .join(", ")
1012 ),
1013 )
1014 })
1015 }
1016
1017 pub fn get_function_by_name_args<'a>(
1018 &self,
1019 db_name: &str,
1020 schema_path: SchemaPath<'a>,
1021 function_name: &str,
1022 args: &[DataType],
1023 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
1024 schema_path
1025 .try_find(|schema_name| -> CatalogResult<_> {
1026 Ok(self
1027 .get_schema_by_name(db_name, schema_name)?
1028 .get_function_by_name_args(function_name, args))
1029 })?
1030 .ok_or_else(|| {
1031 CatalogError::not_found(
1032 "function",
1033 format!(
1034 "{}({})",
1035 function_name,
1036 args.iter().map(|a| a.to_string()).join(", ")
1037 ),
1038 )
1039 })
1040 }
1041
1042 pub fn get_functions_by_name<'a>(
1044 &self,
1045 db_name: &str,
1046 schema_path: SchemaPath<'a>,
1047 function_name: &str,
1048 ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
1049 schema_path
1050 .try_find(|schema_name| -> CatalogResult<_> {
1051 Ok(self
1052 .get_schema_by_name(db_name, schema_name)?
1053 .get_functions_by_name(function_name))
1054 })?
1055 .ok_or_else(|| CatalogError::not_found("function", function_name))
1056 }
1057
1058 pub fn check_relation_name_duplicated(
1060 &self,
1061 db_name: &str,
1062 schema_name: &str,
1063 relation_name: &str,
1064 ) -> CatalogResult<()> {
1065 let schema = self.get_schema_by_name(db_name, schema_name)?;
1066
1067 if let Some(table) = schema.get_any_table_by_name(relation_name) {
1068 let is_creating = table.stream_job_status == StreamJobStatus::Creating;
1069 if table.is_index() {
1070 Err(CatalogError::duplicated_under_creation(
1071 "index",
1072 relation_name.to_owned(),
1073 is_creating,
1074 ))
1075 } else if table.is_mview() {
1076 Err(CatalogError::duplicated_under_creation(
1077 "materialized view",
1078 relation_name.to_owned(),
1079 is_creating,
1080 ))
1081 } else {
1082 Err(CatalogError::duplicated_under_creation(
1083 "table",
1084 relation_name.to_owned(),
1085 is_creating,
1086 ))
1087 }
1088 } else if schema.get_source_by_name(relation_name).is_some() {
1089 Err(CatalogError::duplicated("source", relation_name))
1090 } else if let Some(sink) = schema.get_any_sink_by_name(relation_name) {
1091 Err(CatalogError::duplicated_under_creation(
1092 "sink",
1093 relation_name.to_owned(),
1094 !sink.is_created(),
1095 ))
1096 } else if schema.get_view_by_name(relation_name).is_some() {
1097 Err(CatalogError::duplicated("view", relation_name))
1098 } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
1099 let is_not_created = subscription.subscription_state != SubscriptionState::Created;
1100 Err(CatalogError::duplicated_under_creation(
1101 "subscription",
1102 relation_name.to_owned(),
1103 is_not_created,
1104 ))
1105 } else {
1106 Ok(())
1107 }
1108 }
1109
1110 pub fn check_function_name_duplicated(
1111 &self,
1112 db_name: &str,
1113 schema_name: &str,
1114 function_name: &str,
1115 arg_types: &[DataType],
1116 ) -> CatalogResult<()> {
1117 let schema = self.get_schema_by_name(db_name, schema_name)?;
1118
1119 if schema
1120 .get_function_by_name_args(function_name, arg_types)
1121 .is_some()
1122 {
1123 let name = format!(
1124 "{function_name}({})",
1125 arg_types.iter().map(|t| t.to_string()).join(",")
1126 );
1127 Err(CatalogError::duplicated("function", name))
1128 } else {
1129 Ok(())
1130 }
1131 }
1132
1133 pub fn check_connection_name_duplicated(
1135 &self,
1136 db_name: &str,
1137 schema_name: &str,
1138 connection_name: &str,
1139 ) -> CatalogResult<()> {
1140 let schema = self.get_schema_by_name(db_name, schema_name)?;
1141
1142 if schema.get_connection_by_name(connection_name).is_some() {
1143 Err(CatalogError::duplicated("connection", connection_name))
1144 } else {
1145 Ok(())
1146 }
1147 }
1148
1149 pub fn check_secret_name_duplicated(
1150 &self,
1151 db_name: &str,
1152 schema_name: &str,
1153 secret_name: &str,
1154 ) -> CatalogResult<()> {
1155 let schema = self.get_schema_by_name(db_name, schema_name)?;
1156
1157 if schema.get_secret_by_name(secret_name).is_some() {
1158 Err(CatalogError::duplicated("secret", secret_name))
1159 } else {
1160 Ok(())
1161 }
1162 }
1163
1164 pub fn table_stats(&self) -> &HummockVersionStats {
1165 &self.table_stats
1166 }
1167
1168 pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1169 self.table_stats = table_stats;
1170 }
1171
1172 pub fn set_object_dependencies(&mut self, dependencies: Vec<PbObjectDependency>) {
1173 self.object_dependencies.clear();
1174 self.insert_object_dependencies(dependencies);
1175 }
1176
1177 pub fn insert_object_dependencies(&mut self, dependencies: Vec<PbObjectDependency>) {
1178 let mut grouped: HashMap<ObjectId, Vec<ObjectDependency>> = HashMap::new();
1179 for dependency in dependencies {
1180 let dependency = ObjectDependency::from(dependency);
1181 grouped
1182 .entry(dependency.object_id)
1183 .or_default()
1184 .push(dependency);
1185 }
1186 for (object_id, deps) in grouped {
1187 self.object_dependencies.insert(object_id, deps);
1188 }
1189 }
1190
1191 pub fn iter_object_dependencies(&self) -> impl Iterator<Item = &ObjectDependency> {
1192 self.object_dependencies
1193 .values()
1194 .flat_map(|deps| deps.iter())
1195 }
1196
1197 fn remove_object_dependencies_for_object(&mut self, object_id: ObjectId) {
1198 let mut object_ids = HashSet::new();
1199 object_ids.insert(object_id);
1200 self.remove_object_dependencies_for_objects(&object_ids);
1201 }
1202
1203 fn remove_object_dependencies_for_objects(&mut self, object_ids: &HashSet<ObjectId>) {
1204 if object_ids.is_empty() {
1205 return;
1206 }
1207 self.object_dependencies.retain(|object_id, deps| {
1208 if object_ids.contains(object_id) {
1209 return false;
1210 }
1211 deps.retain(|dep| !object_ids.contains(&dep.referenced_object_id));
1212 !deps.is_empty()
1213 });
1214 }
1215
1216 pub fn get_all_indexes_related_to_object(
1217 &self,
1218 db_id: DatabaseId,
1219 schema_id: SchemaId,
1220 mv_id: TableId,
1221 ) -> Vec<Arc<IndexCatalog>> {
1222 self.get_database_by_id(db_id)
1223 .unwrap()
1224 .get_schema_by_id(schema_id)
1225 .unwrap()
1226 .get_any_indexes_by_table_id(mv_id)
1227 }
1228
1229 pub fn get_id_by_class_name(
1230 &self,
1231 db_name: &str,
1232 schema_path: SchemaPath<'_>,
1233 class_name: &str,
1234 ) -> CatalogResult<ObjectId> {
1235 schema_path
1236 .try_find(|schema_name| -> CatalogResult<_> {
1237 let schema = self.get_schema_by_name(db_name, schema_name)?;
1238 #[allow(clippy::manual_map)]
1239 if let Some(item) = schema.get_system_table_by_name(class_name) {
1240 Ok(Some(item.id().as_object_id()))
1241 } else if let Some(item) = schema.get_any_table_by_name(class_name) {
1242 Ok(Some(item.id().as_object_id()))
1243 } else if let Some(item) = schema.get_any_index_by_name(class_name) {
1244 Ok(Some(item.id.as_object_id()))
1245 } else if let Some(item) = schema.get_source_by_name(class_name) {
1246 Ok(Some(item.id.as_object_id()))
1247 } else if let Some(item) = schema.get_view_by_name(class_name) {
1248 Ok(Some(item.id.as_object_id()))
1249 } else if let Some(item) = schema.get_any_sink_by_name(class_name) {
1250 Ok(Some(item.id.as_object_id()))
1251 } else {
1252 Ok(None)
1253 }
1254 })?
1255 .map(|(id, _)| id)
1256 .ok_or_else(|| CatalogError::not_found("class", class_name))
1257 }
1258}