risingwave_frontend/catalog/
database_catalog.rs1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_pb::catalog::{PbDatabase, PbSchema};
19
20use super::{OwnedByUserCatalog, OwnedGrantObject};
21use crate::catalog::schema_catalog::SchemaCatalog;
22use crate::catalog::{DatabaseId, SchemaId, TableId};
23use crate::user::UserId;
24
25#[derive(Clone, Debug)]
26pub struct DatabaseCatalog {
27 id: DatabaseId,
28 pub name: String,
29 schema_by_name: HashMap<String, SchemaCatalog>,
30 schema_name_by_id: HashMap<SchemaId, String>,
31 pub owner: u32,
32 pub resource_group: String,
33 pub barrier_interval_ms: Option<u32>,
34 pub checkpoint_frequency: Option<u64>,
35}
36
37impl DatabaseCatalog {
38 pub fn create_schema(&mut self, proto: &PbSchema) {
39 let name = proto.name.clone();
40 let id = proto.id;
41 let schema = proto.into();
42 self.schema_by_name
43 .try_insert(name.clone(), schema)
44 .unwrap();
45 self.schema_name_by_id.try_insert(id, name).unwrap();
46 }
47
48 pub fn drop_schema(&mut self, schema_id: SchemaId) {
49 let name = self.schema_name_by_id.remove(&schema_id).unwrap();
50 self.schema_by_name.remove(&name).unwrap();
51 }
52
53 pub fn get_all_schema_names(&self) -> Vec<String> {
54 self.schema_by_name.keys().cloned().collect_vec()
55 }
56
57 pub fn iter_all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
58 self.schema_by_name
59 .values()
60 .flat_map(|schema| schema.iter_all().map(|t| t.id()))
61 }
62
63 pub fn iter_schemas(&self) -> impl Iterator<Item = &SchemaCatalog> {
64 self.schema_by_name.values()
65 }
66
67 pub fn iter_schemas_mut(&mut self) -> impl Iterator<Item = &mut SchemaCatalog> {
68 self.schema_by_name.values_mut()
69 }
70
71 pub fn get_schema_by_name(&self, name: &str) -> Option<&SchemaCatalog> {
72 self.schema_by_name.get(name)
73 }
74
75 pub fn get_schema_by_id(&self, schema_id: &SchemaId) -> Option<&SchemaCatalog> {
76 self.schema_by_name
77 .get(self.schema_name_by_id.get(schema_id)?)
78 }
79
80 pub fn get_schema_mut(&mut self, schema_id: SchemaId) -> Option<&mut SchemaCatalog> {
81 let name = self.schema_name_by_id.get(&schema_id).unwrap();
82 self.schema_by_name.get_mut(name)
83 }
84
85 pub fn get_grant_object_by_oid(&self, oid: u32) -> Option<OwnedGrantObject> {
86 for schema in self.schema_by_name.values() {
87 let object = schema.get_grant_object_by_oid(oid);
88 if object.is_some() {
89 return object;
90 }
91 }
92 None
93 }
94
95 pub fn update_schema(&mut self, prost: &PbSchema) {
96 let id = prost.id;
97 let name = prost.name.clone();
98
99 let old_schema_name = self.schema_name_by_id.get(&id).unwrap().to_owned();
100 if old_schema_name != name {
101 let mut schema = self.schema_by_name.remove(&old_schema_name).unwrap();
102 schema.name.clone_from(&name);
103 schema.database_id = prost.database_id;
104 schema.owner = prost.owner;
105 self.schema_by_name.insert(name.clone(), schema);
106 self.schema_name_by_id.insert(id, name);
107 } else {
108 let schema = self.get_schema_mut(id).unwrap();
109 schema.name.clone_from(&name);
110 schema.database_id = prost.database_id;
111 schema.owner = prost.owner;
112 };
113 }
114
115 pub fn id(&self) -> DatabaseId {
116 self.id
117 }
118
119 pub fn name(&self) -> &str {
120 &self.name
121 }
122
123 pub fn to_prost(&self) -> PbDatabase {
124 PbDatabase {
125 id: self.id,
126 name: self.name.clone(),
127 owner: self.owner,
128 resource_group: self.resource_group.clone(),
129 barrier_interval_ms: self.barrier_interval_ms,
130 checkpoint_frequency: self.checkpoint_frequency,
131 }
132 }
133}
134
135impl OwnedByUserCatalog for DatabaseCatalog {
136 fn owner(&self) -> UserId {
137 self.owner
138 }
139}
140
141impl From<&PbDatabase> for DatabaseCatalog {
142 fn from(db: &PbDatabase) -> Self {
143 Self {
144 id: db.id,
145 name: db.name.clone(),
146 schema_by_name: HashMap::new(),
147 schema_name_by_id: HashMap::new(),
148 owner: db.owner,
149 resource_group: db.resource_group.clone(),
150 barrier_interval_ms: db.barrier_interval_ms,
151 checkpoint_frequency: db.checkpoint_frequency,
152 }
153 }
154}