1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{FunctionId, IndexId, StreamJobStatus, TableId};
20use risingwave_common::id::ObjectId;
21use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
22use risingwave_common::types::DataType;
23use risingwave_connector::sink::catalog::SinkCatalog;
24use risingwave_pb::catalog::{
25 PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
26 PbSubscription, PbTable, PbView,
27};
28use risingwave_pb::common::PbObjectType;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::meta::ObjectDependency as PbObjectDependency;
31
32use super::function_catalog::FunctionCatalog;
33use super::source_catalog::SourceCatalog;
34use super::subscription_catalog::{SubscriptionCatalog, SubscriptionState};
35use super::view_catalog::ViewCatalog;
36use super::{
37 CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId,
38};
39use crate::catalog::connection_catalog::ConnectionCatalog;
40use crate::catalog::database_catalog::DatabaseCatalog;
41use crate::catalog::schema_catalog::SchemaCatalog;
42use crate::catalog::secret_catalog::SecretCatalog;
43use crate::catalog::system_catalog::{
44 SystemTableCatalog, get_sys_tables_in_schema, get_sys_views_in_schema,
45};
46use crate::catalog::table_catalog::TableCatalog;
47use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
48use crate::expr::{Expr, ExprImpl};
49
50#[derive(Copy, Clone)]
51pub enum SchemaPath<'a> {
52 Name(&'a str),
53 Path(&'a SearchPath, &'a str),
55}
56
57impl<'a> SchemaPath<'a> {
58 pub fn new(
59 schema_name: Option<&'a str>,
60 search_path: &'a SearchPath,
61 user_name: &'a str,
62 ) -> Self {
63 match schema_name {
64 Some(schema_name) => SchemaPath::Name(schema_name),
65 None => SchemaPath::Path(search_path, user_name),
66 }
67 }
68
69 pub fn try_find<T, E>(
71 &self,
72 mut f: impl FnMut(&str) -> Result<Option<T>, E>,
73 ) -> Result<Option<(T, &'a str)>, E> {
74 match self {
75 SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))),
76 SchemaPath::Path(search_path, user_name) => {
77 for schema_name in search_path.path() {
78 let mut schema_name: &str = schema_name;
79 if schema_name == USER_NAME_WILD_CARD {
80 schema_name = user_name;
81 }
82 if let Ok(Some(res)) = f(schema_name) {
83 return Ok(Some((res, schema_name)));
84 }
85 }
86 Ok(None)
87 }
88 }
89 }
90}
91
92#[derive(Clone, Debug)]
93pub struct ObjectDependency {
94 pub object_id: ObjectId,
95 pub referenced_object_id: ObjectId,
96 pub referenced_object_type: PbObjectType,
97}
98
99impl From<PbObjectDependency> for ObjectDependency {
100 fn from(dependency: PbObjectDependency) -> Self {
101 let referenced_object_type = PbObjectType::try_from(dependency.referenced_object_type)
102 .unwrap_or(PbObjectType::Unspecified);
103 Self {
104 object_id: dependency.object_id,
105 referenced_object_id: dependency.referenced_object_id,
106 referenced_object_type,
107 }
108 }
109}
110
111pub struct Catalog {
123 database_by_name: HashMap<String, DatabaseCatalog>,
124 db_name_by_id: HashMap<DatabaseId, String>,
125 table_by_id: HashMap<TableId, Arc<TableCatalog>>,
127 table_stats: HummockVersionStats,
128 object_dependencies: HashMap<ObjectId, Vec<ObjectDependency>>,
129}
130
131#[expect(clippy::derivable_impls)]
132impl Default for Catalog {
133 fn default() -> Self {
134 Self {
135 database_by_name: HashMap::new(),
136 db_name_by_id: HashMap::new(),
137 table_by_id: HashMap::new(),
138 table_stats: HummockVersionStats::default(),
139 object_dependencies: HashMap::new(),
140 }
141 }
142}
143
144impl Catalog {
145 fn get_database_mut(&mut self, db_id: DatabaseId) -> Option<&mut DatabaseCatalog> {
146 let name = self.db_name_by_id.get(&db_id)?;
147 self.database_by_name.get_mut(name)
148 }
149
150 pub fn clear(&mut self) {
151 self.database_by_name.clear();
152 self.db_name_by_id.clear();
153 self.table_by_id.clear();
154 self.object_dependencies.clear();
155 }
156
157 pub fn create_database(&mut self, db: &PbDatabase) {
158 let name = db.name.clone();
159 let id = db.id;
160
161 self.database_by_name
162 .try_insert(name.clone(), db.into())
163 .unwrap();
164 self.db_name_by_id.try_insert(id, name).unwrap();
165 }
166
167 pub fn create_schema(&mut self, proto: &PbSchema) {
168 let database_id = proto.database_id;
169 let id = proto.id;
170 self.get_database_mut(database_id)
171 .unwrap()
172 .create_schema(proto);
173
174 for sys_table in get_sys_tables_in_schema(proto.name.as_str()) {
175 self.get_database_mut(database_id)
176 .unwrap()
177 .get_schema_mut(id)
178 .unwrap()
179 .create_sys_table(sys_table);
180 }
181 for mut sys_view in get_sys_views_in_schema(proto.name.as_str()) {
182 sys_view.database_id = database_id;
183 sys_view.schema_id = id;
184 self.get_database_mut(database_id)
185 .unwrap()
186 .get_schema_mut(id)
187 .unwrap()
188 .create_sys_view(Arc::new(sys_view));
189 }
190 }
191
192 pub fn create_table(&mut self, proto: &PbTable) {
193 let table = self
194 .get_database_mut(proto.database_id)
195 .unwrap()
196 .get_schema_mut(proto.schema_id)
197 .unwrap()
198 .create_table(proto);
199 self.table_by_id.insert(proto.id, table);
200 }
201
202 pub fn create_index(&mut self, proto: &PbIndex) {
203 self.get_database_mut(proto.database_id)
204 .unwrap()
205 .get_schema_mut(proto.schema_id)
206 .unwrap()
207 .create_index(proto);
208 }
209
210 pub fn create_source(&mut self, proto: &PbSource) {
211 self.get_database_mut(proto.database_id)
212 .unwrap()
213 .get_schema_mut(proto.schema_id)
214 .unwrap()
215 .create_source(proto);
216 }
217
218 pub fn create_sink(&mut self, proto: &PbSink) {
219 self.get_database_mut(proto.database_id)
220 .unwrap()
221 .get_schema_mut(proto.schema_id)
222 .unwrap()
223 .create_sink(proto);
224 }
225
226 pub fn create_subscription(&mut self, proto: &PbSubscription) {
227 self.get_database_mut(proto.database_id)
228 .unwrap()
229 .get_schema_mut(proto.schema_id)
230 .unwrap()
231 .create_subscription(proto);
232 }
233
234 pub fn create_secret(&mut self, proto: &PbSecret) {
235 self.get_database_mut(proto.database_id)
236 .unwrap()
237 .get_schema_mut(proto.schema_id)
238 .unwrap()
239 .create_secret(proto);
240 }
241
242 pub fn create_view(&mut self, proto: &PbView) {
243 self.get_database_mut(proto.database_id)
244 .unwrap()
245 .get_schema_mut(proto.schema_id)
246 .unwrap()
247 .create_view(proto);
248 }
249
250 pub fn create_function(&mut self, proto: &PbFunction) {
251 self.get_database_mut(proto.database_id)
252 .unwrap()
253 .get_schema_mut(proto.schema_id)
254 .unwrap()
255 .create_function(proto);
256 }
257
258 pub fn create_connection(&mut self, proto: &PbConnection) {
259 self.get_database_mut(proto.database_id)
260 .unwrap()
261 .get_schema_mut(proto.schema_id)
262 .unwrap()
263 .create_connection(proto);
264 }
265
266 pub fn drop_connection(
267 &mut self,
268 db_id: DatabaseId,
269 schema_id: SchemaId,
270 connection_id: ConnectionId,
271 ) {
272 self.get_database_mut(db_id)
273 .unwrap()
274 .get_schema_mut(schema_id)
275 .unwrap()
276 .drop_connection(connection_id);
277 }
278
279 pub fn update_connection(&mut self, proto: &PbConnection) {
280 let database = self.get_database_mut(proto.database_id).unwrap();
281 let schema = database.get_schema_mut(proto.schema_id).unwrap();
282 if schema.get_connection_by_id(proto.id).is_some() {
283 schema.update_connection(proto);
284 } else {
285 schema.create_connection(proto);
287 database
288 .iter_schemas_mut()
289 .find(|schema| {
290 schema.id() != proto.schema_id
291 && schema.get_connection_by_id(proto.id).is_some()
292 })
293 .unwrap()
294 .drop_connection(proto.id);
295 }
296 }
297
298 pub fn update_secret(&mut self, proto: &PbSecret) {
299 let database = self.get_database_mut(proto.database_id).unwrap();
300 let schema = database.get_schema_mut(proto.schema_id).unwrap();
301 let secret_id = proto.id;
302 if schema.get_secret_by_id(secret_id).is_some() {
303 schema.update_secret(proto);
304 } else {
305 schema.create_secret(proto);
307 database
308 .iter_schemas_mut()
309 .find(|schema| {
310 schema.id() != proto.schema_id && schema.get_secret_by_id(secret_id).is_some()
311 })
312 .unwrap()
313 .drop_secret(secret_id);
314 }
315 }
316
317 pub fn drop_database(&mut self, db_id: DatabaseId) {
318 let name = self.db_name_by_id.remove(&db_id).unwrap();
319 let database = self.database_by_name.remove(&name).unwrap();
320 let object_ids: HashSet<ObjectId> = database.iter_object_ids().collect();
321 self.remove_object_dependencies_for_objects(&object_ids);
322 database.iter_all_table_ids().for_each(|table| {
323 self.table_by_id.remove(&table);
324 });
325 }
326
327 pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
328 let object_ids = self
329 .get_database_by_id(db_id)
330 .ok()
331 .and_then(|db| db.get_schema_by_id(schema_id))
332 .map(|schema| schema.iter_object_ids().collect::<HashSet<_>>())
333 .unwrap_or_default();
334 self.remove_object_dependencies_for_objects(&object_ids);
335 self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
336 }
337
338 pub fn drop_table(&mut self, db_id: DatabaseId, schema_id: SchemaId, tb_id: TableId) {
339 self.remove_object_dependencies_for_object(tb_id.as_object_id());
340 self.table_by_id.remove(&tb_id);
341 self.get_database_mut(db_id)
342 .unwrap()
343 .get_schema_mut(schema_id)
344 .unwrap()
345 .drop_table(tb_id);
346 }
347
348 pub fn update_table(&mut self, proto: &PbTable) {
349 let database = self.get_database_mut(proto.database_id).unwrap();
350 let schema = database.get_schema_mut(proto.schema_id).unwrap();
351 let table = if schema.get_table_by_id(proto.id).is_some() {
352 schema.update_table(proto)
353 } else {
354 let new_table = schema.create_table(proto);
356 database
357 .iter_schemas_mut()
358 .find(|schema| {
359 schema.id() != proto.schema_id
360 && schema.get_created_table_by_id(proto.id).is_some()
361 })
362 .unwrap()
363 .drop_table(proto.id);
364 new_table
365 };
366
367 self.table_by_id.insert(proto.id, table);
368 }
369
370 pub fn update_database(&mut self, proto: &PbDatabase) {
371 let id = proto.id;
372 let name = proto.name.clone();
373
374 let old_database_name = self.db_name_by_id.get(&id).unwrap().to_owned();
375 if old_database_name != name {
376 let mut database = self.database_by_name.remove(&old_database_name).unwrap();
377 database.name.clone_from(&name);
378 database.owner = proto.owner;
379 database.resource_group.clone_from(&proto.resource_group);
380 database.barrier_interval_ms = proto.barrier_interval_ms;
381 database.checkpoint_frequency = proto.checkpoint_frequency;
382 self.database_by_name.insert(name.clone(), database);
383 self.db_name_by_id.insert(id, name);
384 } else {
385 let database = self.get_database_mut(id).unwrap();
386 database.name = name;
387 database.owner = proto.owner;
388 database.resource_group = proto.resource_group.clone();
389 database.barrier_interval_ms = proto.barrier_interval_ms;
390 database.checkpoint_frequency = proto.checkpoint_frequency;
391 }
392 }
393
394 pub fn update_schema(&mut self, proto: &PbSchema) {
395 self.get_database_mut(proto.database_id)
396 .unwrap()
397 .update_schema(proto);
398 }
399
400 pub fn update_index(&mut self, proto: &PbIndex) {
401 let database = self.get_database_mut(proto.database_id).unwrap();
402 let schema = database.get_schema_mut(proto.schema_id).unwrap();
403 if schema.get_index_by_id(proto.id).is_some() {
404 schema.update_index(proto);
405 } else {
406 schema.create_index(proto);
408 database
409 .iter_schemas_mut()
410 .find(|schema| {
411 schema.id() != proto.schema_id && schema.get_index_by_id(proto.id).is_some()
412 })
413 .unwrap()
414 .drop_index(proto.id);
415 }
416 }
417
418 pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
419 self.remove_object_dependencies_for_object(source_id.as_object_id());
420 self.get_database_mut(db_id)
421 .unwrap()
422 .get_schema_mut(schema_id)
423 .unwrap()
424 .drop_source(source_id);
425 }
426
427 pub fn update_source(&mut self, proto: &PbSource) {
428 let database = self.get_database_mut(proto.database_id).unwrap();
429 let schema = database.get_schema_mut(proto.schema_id).unwrap();
430 if schema.get_source_by_id(proto.id).is_some() {
431 schema.update_source(proto);
432 } else {
433 schema.create_source(proto);
435 database
436 .iter_schemas_mut()
437 .find(|schema| {
438 schema.id() != proto.schema_id && schema.get_source_by_id(proto.id).is_some()
439 })
440 .unwrap()
441 .drop_source(proto.id);
442 }
443 }
444
445 pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
446 self.remove_object_dependencies_for_object(sink_id.as_object_id());
447 self.get_database_mut(db_id)
448 .unwrap()
449 .get_schema_mut(schema_id)
450 .unwrap()
451 .drop_sink(sink_id);
452 }
453
454 pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) {
455 self.remove_object_dependencies_for_object(secret_id.as_object_id());
456 self.get_database_mut(db_id)
457 .unwrap()
458 .get_schema_mut(schema_id)
459 .unwrap()
460 .drop_secret(secret_id);
461 }
462
463 pub fn update_sink(&mut self, proto: &PbSink) {
464 let database = self.get_database_mut(proto.database_id).unwrap();
465 let schema = database.get_schema_mut(proto.schema_id).unwrap();
466 if schema.get_sink_by_id(proto.id).is_some() {
467 schema.update_sink(proto);
468 } else {
469 schema.create_sink(proto);
471 database
472 .iter_schemas_mut()
473 .find(|schema| {
474 schema.id() != proto.schema_id && schema.get_sink_by_id(proto.id).is_some()
475 })
476 .unwrap()
477 .drop_sink(proto.id);
478 }
479 }
480
481 pub fn drop_subscription(
482 &mut self,
483 db_id: DatabaseId,
484 schema_id: SchemaId,
485 subscription_id: SubscriptionId,
486 ) {
487 self.remove_object_dependencies_for_object(subscription_id.as_object_id());
488 self.get_database_mut(db_id)
489 .unwrap()
490 .get_schema_mut(schema_id)
491 .unwrap()
492 .drop_subscription(subscription_id);
493 }
494
495 pub fn update_subscription(&mut self, proto: &PbSubscription) {
496 let database = self.get_database_mut(proto.database_id).unwrap();
497 let schema = database.get_schema_mut(proto.schema_id).unwrap();
498 if schema.get_subscription_by_id(proto.id).is_some() {
499 schema.update_subscription(proto);
500 } else {
501 schema.create_subscription(proto);
503 database
504 .iter_schemas_mut()
505 .find(|schema| {
506 schema.id() != proto.schema_id
507 && schema.get_subscription_by_id(proto.id).is_some()
508 })
509 .unwrap()
510 .drop_subscription(proto.id);
511 }
512 }
513
514 pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
515 self.remove_object_dependencies_for_object(index_id.as_object_id());
516 self.get_database_mut(db_id)
517 .unwrap()
518 .get_schema_mut(schema_id)
519 .unwrap()
520 .drop_index(index_id);
521 }
522
523 pub fn drop_view(&mut self, db_id: DatabaseId, schema_id: SchemaId, view_id: ViewId) {
524 self.remove_object_dependencies_for_object(view_id.as_object_id());
525 self.get_database_mut(db_id)
526 .unwrap()
527 .get_schema_mut(schema_id)
528 .unwrap()
529 .drop_view(view_id);
530 }
531
532 pub fn update_view(&mut self, proto: &PbView) {
533 let database = self.get_database_mut(proto.database_id).unwrap();
534 let schema = database.get_schema_mut(proto.schema_id).unwrap();
535 if schema.get_view_by_id(proto.id).is_some() {
536 schema.update_view(proto);
537 } else {
538 schema.create_view(proto);
540 database
541 .iter_schemas_mut()
542 .find(|schema| {
543 schema.id() != proto.schema_id && schema.get_view_by_id(proto.id).is_some()
544 })
545 .unwrap()
546 .drop_view(proto.id);
547 }
548 }
549
550 pub fn drop_function(
551 &mut self,
552 db_id: DatabaseId,
553 schema_id: SchemaId,
554 function_id: FunctionId,
555 ) {
556 self.remove_object_dependencies_for_object(function_id.as_object_id());
557 self.get_database_mut(db_id)
558 .unwrap()
559 .get_schema_mut(schema_id)
560 .unwrap()
561 .drop_function(function_id);
562 }
563
564 pub fn update_function(&mut self, proto: &PbFunction) {
565 let database = self.get_database_mut(proto.database_id).unwrap();
566 let schema = database.get_schema_mut(proto.schema_id).unwrap();
567 if schema.get_function_by_id(proto.id).is_some() {
568 schema.update_function(proto);
569 } else {
570 schema.create_function(proto);
572 database
573 .iter_schemas_mut()
574 .find(|schema| {
575 schema.id() != proto.schema_id && schema.get_function_by_id(proto.id).is_some()
576 })
577 .unwrap()
578 .drop_function(proto.id);
579 }
580
581 self.get_database_mut(proto.database_id)
582 .unwrap()
583 .get_schema_mut(proto.schema_id)
584 .unwrap()
585 .update_function(proto);
586 }
587
588 pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
589 self.database_by_name
590 .get(db_name)
591 .ok_or_else(|| CatalogError::not_found("database", db_name))
592 }
593
594 pub fn get_database_by_id(&self, db_id: DatabaseId) -> CatalogResult<&DatabaseCatalog> {
595 let db_name = self
596 .db_name_by_id
597 .get(&db_id)
598 .ok_or_else(|| CatalogError::not_found("db_id", db_id.to_string()))?;
599 self.database_by_name
600 .get(db_name)
601 .ok_or_else(|| CatalogError::not_found("database", db_name))
602 }
603
604 pub fn find_schema_secret_by_secret_id(
605 &self,
606 db_name: &str,
607 secret_id: SecretId,
608 ) -> CatalogResult<(String, String)> {
609 let db = self.get_database_by_name(db_name)?;
610 let schema_secret = db
611 .iter_schemas()
612 .find_map(|schema| {
613 schema
614 .get_secret_by_id(secret_id)
615 .map(|secret| (schema.name(), secret.name.clone()))
616 })
617 .ok_or_else(|| CatalogError::not_found("secret", secret_id.to_string()))?;
618 Ok(schema_secret)
619 }
620
621 pub fn get_all_schema_names(&self, db_name: &str) -> CatalogResult<Vec<String>> {
622 Ok(self.get_database_by_name(db_name)?.get_all_schema_names())
623 }
624
625 pub fn iter_schemas(
626 &self,
627 db_name: &str,
628 ) -> CatalogResult<impl Iterator<Item = &SchemaCatalog>> {
629 Ok(self.get_database_by_name(db_name)?.iter_schemas())
630 }
631
632 pub fn get_all_database_names(&self) -> Vec<String> {
633 self.database_by_name.keys().cloned().collect_vec()
634 }
635
636 pub fn iter_databases(&self) -> impl Iterator<Item = &DatabaseCatalog> {
637 self.database_by_name.values()
638 }
639
640 pub fn get_schema_by_name(
641 &self,
642 db_name: &str,
643 schema_name: &str,
644 ) -> CatalogResult<&SchemaCatalog> {
645 self.get_database_by_name(db_name)?
646 .get_schema_by_name(schema_name)
647 .ok_or_else(|| CatalogError::not_found("schema", schema_name))
648 }
649
650 pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
651 self.get_any_table_by_id(table_id)
652 .map(|table| table.name.clone())
653 }
654
655 pub fn get_schema_by_id(
656 &self,
657 db_id: DatabaseId,
658 schema_id: SchemaId,
659 ) -> CatalogResult<&SchemaCatalog> {
660 self.get_database_by_id(db_id)?
661 .get_schema_by_id(schema_id)
662 .ok_or_else(|| CatalogError::not_found("schema_id", schema_id.to_string()))
663 }
664
665 pub fn first_valid_schema(
667 &self,
668 db_name: &str,
669 search_path: &SearchPath,
670 user_name: &str,
671 ) -> CatalogResult<&SchemaCatalog> {
672 for path in search_path.real_path() {
673 let mut schema_name: &str = path;
674 if schema_name == USER_NAME_WILD_CARD {
675 schema_name = user_name;
676 }
677
678 if let schema_catalog @ Ok(_) = self.get_schema_by_name(db_name, schema_name) {
679 return schema_catalog;
680 }
681 }
682 Err(CatalogError::not_found(
683 "first valid schema",
684 "no schema has been selected to create in".to_owned(),
685 ))
686 }
687
688 pub fn get_source_by_id<'a>(
689 &self,
690 db_name: &'a str,
691 schema_path: SchemaPath<'a>,
692 source_id: SourceId,
693 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
694 schema_path
695 .try_find(|schema_name| -> CatalogResult<_> {
696 Ok(self
697 .get_schema_by_name(db_name, schema_name)?
698 .get_source_by_id(source_id))
699 })?
700 .ok_or_else(|| CatalogError::not_found("source", source_id.to_string()))
701 }
702
703 pub fn get_table_by_name<'a>(
704 &self,
705 db_name: &str,
706 schema_path: SchemaPath<'a>,
707 table_name: &str,
708 bind_creating: bool,
709 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
710 schema_path
711 .try_find(|schema_name| -> CatalogResult<_> {
712 Ok(self
713 .get_schema_by_name(db_name, schema_name)?
714 .get_table_by_name(table_name, bind_creating))
715 })?
716 .ok_or_else(|| CatalogError::not_found("table", table_name))
717 }
718
719 pub fn get_any_table_by_name<'a>(
722 &self,
723 db_name: &str,
724 schema_path: SchemaPath<'a>,
725 table_name: &str,
726 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
727 self.get_table_by_name(db_name, schema_path, table_name, true)
728 }
729
730 pub fn get_created_table_by_name<'a>(
733 &self,
734 db_name: &str,
735 schema_path: SchemaPath<'a>,
736 table_name: &str,
737 ) -> CatalogResult<(&Arc<TableCatalog>, &'a str)> {
738 self.get_table_by_name(db_name, schema_path, table_name, false)
739 }
740
741 pub fn get_any_table_by_id(&self, table_id: TableId) -> CatalogResult<&Arc<TableCatalog>> {
742 self.table_by_id
743 .get(&table_id)
744 .ok_or_else(|| CatalogError::not_found("table id", table_id.to_string()))
745 }
746
747 pub fn get_created_table_by_id_with_db(
749 &self,
750 db_name: &str,
751 table_id: u32,
752 ) -> CatalogResult<&Arc<TableCatalog>> {
753 let table_id = TableId::from(table_id);
754 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
755 if let Some(table) = schema.get_created_table_by_id(table_id) {
756 return Ok(table);
757 }
758 }
759 Err(CatalogError::not_found("table id", table_id.to_string()))
760 }
761
762 pub fn iter_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
763 self.table_by_id.values()
764 }
765
766 pub fn iter_backfilling_internal_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
767 self.table_by_id
768 .values()
769 .filter(|t| t.is_internal_table() && !t.is_created())
770 }
771
772 pub fn alter_table_name_by_id(&mut self, table_id: TableId, table_name: &str) {
774 let mut found = false;
775 for database in self.database_by_name.values() {
776 if !found {
777 for schema in database.iter_schemas() {
778 if schema.iter_user_table().any(|t| t.id() == table_id) {
779 found = true;
780 break;
781 }
782 }
783 }
784 }
785
786 if found {
787 let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
788 table.name = table_name.to_owned();
789 self.update_table(&table);
790 }
791 }
792
793 #[cfg(test)]
794 pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
795 self.table_by_id.insert(
796 table_id,
797 Arc::new(TableCatalog {
798 fragment_id,
799 ..Default::default()
800 }),
801 );
802 }
803
804 pub fn get_sys_table_by_name(
805 &self,
806 db_name: &str,
807 schema_name: &str,
808 table_name: &str,
809 ) -> CatalogResult<&Arc<SystemTableCatalog>> {
810 self.get_schema_by_name(db_name, schema_name)?
811 .get_system_table_by_name(table_name)
812 .ok_or_else(|| CatalogError::not_found("table", table_name))
813 }
814
815 pub fn get_source_by_name<'a>(
816 &self,
817 db_name: &str,
818 schema_path: SchemaPath<'a>,
819 source_name: &str,
820 ) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
821 schema_path
822 .try_find(|schema_name| -> CatalogResult<_> {
823 Ok(self
824 .get_schema_by_name(db_name, schema_name)?
825 .get_source_by_name(source_name))
826 })?
827 .ok_or_else(|| CatalogError::not_found("source", source_name))
828 }
829
830 pub fn get_sink_by_name<'a>(
831 &self,
832 db_name: &str,
833 schema_path: SchemaPath<'a>,
834 sink_name: &str,
835 bind_creating: bool,
836 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
837 schema_path
838 .try_find(|schema_name| -> CatalogResult<_> {
839 Ok(self
840 .get_schema_by_name(db_name, schema_name)?
841 .get_sink_by_name(sink_name, bind_creating))
842 })?
843 .ok_or_else(|| CatalogError::not_found("sink", sink_name))
844 }
845
846 pub fn get_any_sink_by_name<'a>(
847 &self,
848 db_name: &str,
849 schema_path: SchemaPath<'a>,
850 sink_name: &str,
851 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
852 self.get_sink_by_name(db_name, schema_path, sink_name, true)
853 }
854
855 pub fn get_created_sink_by_name<'a>(
856 &self,
857 db_name: &str,
858 schema_path: SchemaPath<'a>,
859 sink_name: &str,
860 ) -> CatalogResult<(&Arc<SinkCatalog>, &'a str)> {
861 self.get_sink_by_name(db_name, schema_path, sink_name, false)
862 }
863
864 pub fn get_subscription_by_name<'a>(
865 &self,
866 db_name: &str,
867 schema_path: SchemaPath<'a>,
868 subscription_name: &str,
869 ) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
870 schema_path
871 .try_find(|schema_name| -> CatalogResult<_> {
872 Ok(self
873 .get_schema_by_name(db_name, schema_name)?
874 .get_subscription_by_name(subscription_name))
875 })?
876 .ok_or_else(|| CatalogError::not_found("subscription", subscription_name))
877 }
878
879 pub fn get_index_by_name<'a>(
880 &self,
881 db_name: &str,
882 schema_path: SchemaPath<'a>,
883 index_name: &str,
884 ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
885 schema_path
886 .try_find(|schema_name| -> CatalogResult<_> {
887 Ok(self
888 .get_schema_by_name(db_name, schema_name)?
889 .get_created_index_by_name(index_name))
890 })?
891 .ok_or_else(|| CatalogError::not_found("index", index_name))
892 }
893
894 pub fn get_any_index_by_name<'a>(
895 &self,
896 db_name: &str,
897 schema_path: SchemaPath<'a>,
898 index_name: &str,
899 ) -> CatalogResult<(&Arc<IndexCatalog>, &'a str)> {
900 schema_path
901 .try_find(|schema_name| -> CatalogResult<_> {
902 Ok(self
903 .get_schema_by_name(db_name, schema_name)?
904 .get_any_index_by_name(index_name))
905 })?
906 .ok_or_else(|| CatalogError::not_found("index", index_name))
907 }
908
909 pub fn get_index_by_id(
910 &self,
911 db_name: &str,
912 index_id: u32,
913 ) -> CatalogResult<&Arc<IndexCatalog>> {
914 let index_id = IndexId::from(index_id);
915 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
916 if let Some(index) = schema.get_index_by_id(index_id) {
917 return Ok(index);
918 }
919 }
920 Err(CatalogError::not_found("index", index_id.to_string()))
921 }
922
923 pub fn get_view_by_name<'a>(
924 &self,
925 db_name: &str,
926 schema_path: SchemaPath<'a>,
927 view_name: &str,
928 ) -> CatalogResult<(&Arc<ViewCatalog>, &'a str)> {
929 schema_path
930 .try_find(|schema_name| -> CatalogResult<_> {
931 Ok(self
932 .get_schema_by_name(db_name, schema_name)?
933 .get_view_by_name(view_name))
934 })?
935 .ok_or_else(|| CatalogError::not_found("view", view_name))
936 }
937
938 pub fn get_view_by_id(&self, db_name: &str, view_id: u32) -> CatalogResult<Arc<ViewCatalog>> {
939 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
940 if let Some(view) = schema.get_view_by_id(ViewId::from(view_id)) {
941 return Ok(view.clone());
942 }
943 }
944 Err(CatalogError::not_found("view", view_id.to_string()))
945 }
946
947 pub fn get_secret_by_name<'a>(
948 &self,
949 db_name: &str,
950 schema_path: SchemaPath<'a>,
951 secret_name: &str,
952 ) -> CatalogResult<(&Arc<SecretCatalog>, &'a str)> {
953 schema_path
954 .try_find(|schema_name| -> CatalogResult<_> {
955 Ok(self
956 .get_schema_by_name(db_name, schema_name)?
957 .get_secret_by_name(secret_name))
958 })?
959 .ok_or_else(|| CatalogError::not_found("secret", secret_name))
960 }
961
962 pub fn get_connection_by_id(
963 &self,
964 db_name: &str,
965 connection_id: ConnectionId,
966 ) -> CatalogResult<&Arc<ConnectionCatalog>> {
967 for schema in self.get_database_by_name(db_name)?.iter_schemas() {
968 if let Some(conn) = schema.get_connection_by_id(connection_id) {
969 return Ok(conn);
970 }
971 }
972 Err(CatalogError::not_found(
973 "connection",
974 connection_id.to_string(),
975 ))
976 }
977
978 pub fn get_connection_by_name<'a>(
979 &self,
980 db_name: &str,
981 schema_path: SchemaPath<'a>,
982 connection_name: &str,
983 ) -> CatalogResult<(&Arc<ConnectionCatalog>, &'a str)> {
984 schema_path
985 .try_find(|schema_name| -> CatalogResult<_> {
986 Ok(self
987 .get_schema_by_name(db_name, schema_name)?
988 .get_connection_by_name(connection_name))
989 })?
990 .ok_or_else(|| CatalogError::not_found("connection", connection_name))
991 }
992
993 pub fn get_function_by_name_inputs<'a>(
994 &self,
995 db_name: &str,
996 schema_path: SchemaPath<'a>,
997 function_name: &str,
998 inputs: &mut [ExprImpl],
999 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
1000 schema_path
1001 .try_find(|schema_name| -> CatalogResult<_> {
1002 Ok(self
1003 .get_schema_by_name(db_name, schema_name)?
1004 .get_function_by_name_inputs(function_name, inputs))
1005 })?
1006 .ok_or_else(|| {
1007 CatalogError::not_found(
1008 "function",
1009 format!(
1010 "{}({})",
1011 function_name,
1012 inputs
1013 .iter()
1014 .map(|a| a.return_type().to_string())
1015 .join(", ")
1016 ),
1017 )
1018 })
1019 }
1020
1021 pub fn get_function_by_name_args<'a>(
1022 &self,
1023 db_name: &str,
1024 schema_path: SchemaPath<'a>,
1025 function_name: &str,
1026 args: &[DataType],
1027 ) -> CatalogResult<(&Arc<FunctionCatalog>, &'a str)> {
1028 schema_path
1029 .try_find(|schema_name| -> CatalogResult<_> {
1030 Ok(self
1031 .get_schema_by_name(db_name, schema_name)?
1032 .get_function_by_name_args(function_name, args))
1033 })?
1034 .ok_or_else(|| {
1035 CatalogError::not_found(
1036 "function",
1037 format!(
1038 "{}({})",
1039 function_name,
1040 args.iter().map(|a| a.to_string()).join(", ")
1041 ),
1042 )
1043 })
1044 }
1045
1046 pub fn get_functions_by_name<'a>(
1048 &self,
1049 db_name: &str,
1050 schema_path: SchemaPath<'a>,
1051 function_name: &str,
1052 ) -> CatalogResult<(Vec<&Arc<FunctionCatalog>>, &'a str)> {
1053 schema_path
1054 .try_find(|schema_name| -> CatalogResult<_> {
1055 Ok(self
1056 .get_schema_by_name(db_name, schema_name)?
1057 .get_functions_by_name(function_name))
1058 })?
1059 .ok_or_else(|| CatalogError::not_found("function", function_name))
1060 }
1061
1062 pub fn check_relation_name_duplicated(
1064 &self,
1065 db_name: &str,
1066 schema_name: &str,
1067 relation_name: &str,
1068 ) -> CatalogResult<()> {
1069 let schema = self.get_schema_by_name(db_name, schema_name)?;
1070
1071 if let Some(table) = schema.get_any_table_by_name(relation_name) {
1072 let is_creating = table.stream_job_status == StreamJobStatus::Creating;
1073 if table.is_index() {
1074 Err(CatalogError::duplicated_under_creation(
1075 "index",
1076 relation_name.to_owned(),
1077 is_creating,
1078 ))
1079 } else if table.is_mview() {
1080 Err(CatalogError::duplicated_under_creation(
1081 "materialized view",
1082 relation_name.to_owned(),
1083 is_creating,
1084 ))
1085 } else {
1086 Err(CatalogError::duplicated_under_creation(
1087 "table",
1088 relation_name.to_owned(),
1089 is_creating,
1090 ))
1091 }
1092 } else if schema.get_source_by_name(relation_name).is_some() {
1093 Err(CatalogError::duplicated("source", relation_name))
1094 } else if let Some(sink) = schema.get_any_sink_by_name(relation_name) {
1095 Err(CatalogError::duplicated_under_creation(
1096 "sink",
1097 relation_name.to_owned(),
1098 !sink.is_created(),
1099 ))
1100 } else if schema.get_view_by_name(relation_name).is_some() {
1101 Err(CatalogError::duplicated("view", relation_name))
1102 } else if let Some(subscription) = schema.get_subscription_by_name(relation_name) {
1103 let is_not_created = subscription.subscription_state != SubscriptionState::Created;
1104 Err(CatalogError::duplicated_under_creation(
1105 "subscription",
1106 relation_name.to_owned(),
1107 is_not_created,
1108 ))
1109 } else {
1110 Ok(())
1111 }
1112 }
1113
1114 pub fn check_function_name_duplicated(
1115 &self,
1116 db_name: &str,
1117 schema_name: &str,
1118 function_name: &str,
1119 arg_types: &[DataType],
1120 ) -> CatalogResult<()> {
1121 let schema = self.get_schema_by_name(db_name, schema_name)?;
1122
1123 if schema
1124 .get_function_by_name_args(function_name, arg_types)
1125 .is_some()
1126 {
1127 let name = format!(
1128 "{function_name}({})",
1129 arg_types.iter().map(|t| t.to_string()).join(",")
1130 );
1131 Err(CatalogError::duplicated("function", name))
1132 } else {
1133 Ok(())
1134 }
1135 }
1136
1137 pub fn check_connection_name_duplicated(
1139 &self,
1140 db_name: &str,
1141 schema_name: &str,
1142 connection_name: &str,
1143 ) -> CatalogResult<()> {
1144 let schema = self.get_schema_by_name(db_name, schema_name)?;
1145
1146 if schema.get_connection_by_name(connection_name).is_some() {
1147 Err(CatalogError::duplicated("connection", connection_name))
1148 } else {
1149 Ok(())
1150 }
1151 }
1152
1153 pub fn check_secret_name_duplicated(
1154 &self,
1155 db_name: &str,
1156 schema_name: &str,
1157 secret_name: &str,
1158 ) -> CatalogResult<()> {
1159 let schema = self.get_schema_by_name(db_name, schema_name)?;
1160
1161 if schema.get_secret_by_name(secret_name).is_some() {
1162 Err(CatalogError::duplicated("secret", secret_name))
1163 } else {
1164 Ok(())
1165 }
1166 }
1167
1168 pub fn table_stats(&self) -> &HummockVersionStats {
1169 &self.table_stats
1170 }
1171
1172 pub fn set_table_stats(&mut self, table_stats: HummockVersionStats) {
1173 self.table_stats = table_stats;
1174 }
1175
1176 pub fn set_object_dependencies(&mut self, dependencies: Vec<PbObjectDependency>) {
1177 self.object_dependencies.clear();
1178 self.insert_object_dependencies(dependencies);
1179 }
1180
1181 pub fn insert_object_dependencies(&mut self, dependencies: Vec<PbObjectDependency>) {
1182 let mut grouped: HashMap<ObjectId, Vec<ObjectDependency>> = HashMap::new();
1183 for dependency in dependencies {
1184 let dependency = ObjectDependency::from(dependency);
1185 grouped
1186 .entry(dependency.object_id)
1187 .or_default()
1188 .push(dependency);
1189 }
1190 for (object_id, deps) in grouped {
1191 self.object_dependencies.insert(object_id, deps);
1192 }
1193 }
1194
1195 pub fn iter_object_dependencies(&self) -> impl Iterator<Item = &ObjectDependency> {
1196 self.object_dependencies
1197 .values()
1198 .flat_map(|deps| deps.iter())
1199 }
1200
1201 fn remove_object_dependencies_for_object(&mut self, object_id: ObjectId) {
1202 let mut object_ids = HashSet::new();
1203 object_ids.insert(object_id);
1204 self.remove_object_dependencies_for_objects(&object_ids);
1205 }
1206
1207 fn remove_object_dependencies_for_objects(&mut self, object_ids: &HashSet<ObjectId>) {
1208 if object_ids.is_empty() {
1209 return;
1210 }
1211 self.object_dependencies.retain(|object_id, deps| {
1212 if object_ids.contains(object_id) {
1213 return false;
1214 }
1215 deps.retain(|dep| !object_ids.contains(&dep.referenced_object_id));
1216 !deps.is_empty()
1217 });
1218 }
1219
1220 pub fn get_all_indexes_related_to_object(
1221 &self,
1222 db_id: DatabaseId,
1223 schema_id: SchemaId,
1224 mv_id: TableId,
1225 ) -> Vec<Arc<IndexCatalog>> {
1226 self.get_database_by_id(db_id)
1227 .unwrap()
1228 .get_schema_by_id(schema_id)
1229 .unwrap()
1230 .get_any_indexes_by_table_id(mv_id)
1231 }
1232
1233 pub fn get_id_by_class_name(
1234 &self,
1235 db_name: &str,
1236 schema_path: SchemaPath<'_>,
1237 class_name: &str,
1238 ) -> CatalogResult<ObjectId> {
1239 schema_path
1240 .try_find(|schema_name| -> CatalogResult<_> {
1241 let schema = self.get_schema_by_name(db_name, schema_name)?;
1242
1243 if let Some(item) = schema.get_system_table_by_name(class_name) {
1244 Ok(Some(item.id().as_object_id()))
1245 } else if let Some(item) = schema.get_any_table_by_name(class_name) {
1246 Ok(Some(item.id().as_object_id()))
1247 } else if let Some(item) = schema.get_any_index_by_name(class_name) {
1248 Ok(Some(item.id.as_object_id()))
1249 } else if let Some(item) = schema.get_source_by_name(class_name) {
1250 Ok(Some(item.id.as_object_id()))
1251 } else if let Some(item) = schema.get_view_by_name(class_name) {
1252 Ok(Some(item.id.as_object_id()))
1253 } else if let Some(item) = schema.get_any_sink_by_name(class_name) {
1254 Ok(Some(item.id.as_object_id()))
1255 } else {
1256 Ok(None)
1257 }
1258 })?
1259 .map(|(id, _)| id)
1260 .ok_or_else(|| CatalogError::not_found("class", class_name))
1261 }
1262}