risingwave_frontend/catalog/
database_catalog.rs1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_pb::catalog::{PbDatabase, PbSchema};
19use risingwave_pb::user::grant_privilege::Object;
20
21use super::OwnedByUserCatalog;
22use crate::catalog::schema_catalog::SchemaCatalog;
23use crate::catalog::{DatabaseId, SchemaId, TableId};
24use crate::user::UserId;
25
26#[derive(Clone, Debug)]
27pub struct DatabaseCatalog {
28 id: DatabaseId,
29 pub name: String,
30 schema_by_name: HashMap<String, SchemaCatalog>,
31 schema_name_by_id: HashMap<SchemaId, String>,
32 pub owner: u32,
33 pub resource_group: String,
34 pub barrier_interval_ms: Option<u32>,
35 pub checkpoint_frequency: Option<u64>,
36}
37
38impl DatabaseCatalog {
39 pub fn create_schema(&mut self, proto: &PbSchema) {
40 let name = proto.name.clone();
41 let id = proto.id;
42 let schema = proto.into();
43 self.schema_by_name
44 .try_insert(name.clone(), schema)
45 .unwrap();
46 self.schema_name_by_id.try_insert(id, name).unwrap();
47 }
48
49 pub fn drop_schema(&mut self, schema_id: SchemaId) {
50 let name = self.schema_name_by_id.remove(&schema_id).unwrap();
51 self.schema_by_name.remove(&name).unwrap();
52 }
53
54 pub fn get_all_schema_names(&self) -> Vec<String> {
55 self.schema_by_name.keys().cloned().collect_vec()
56 }
57
58 pub fn iter_all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
59 self.schema_by_name
60 .values()
61 .flat_map(|schema| schema.iter_all().map(|t| t.id()))
62 }
63
64 pub fn iter_schemas(&self) -> impl Iterator<Item = &SchemaCatalog> {
65 self.schema_by_name.values()
66 }
67
68 pub fn iter_schemas_mut(&mut self) -> impl Iterator<Item = &mut SchemaCatalog> {
69 self.schema_by_name.values_mut()
70 }
71
72 pub fn get_schema_by_name(&self, name: &str) -> Option<&SchemaCatalog> {
73 self.schema_by_name.get(name)
74 }
75
76 pub fn get_schema_by_id(&self, schema_id: &SchemaId) -> Option<&SchemaCatalog> {
77 self.schema_by_name
78 .get(self.schema_name_by_id.get(schema_id)?)
79 }
80
81 pub fn get_schema_mut(&mut self, schema_id: SchemaId) -> Option<&mut SchemaCatalog> {
82 let name = self.schema_name_by_id.get(&schema_id).unwrap();
83 self.schema_by_name.get_mut(name)
84 }
85
86 pub fn get_grant_object_by_oid(&self, oid: u32) -> Option<Object> {
87 for schema in self.schema_by_name.values() {
88 let object = schema.get_grant_object_by_oid(oid);
89 if object.is_some() {
90 return object;
91 }
92 }
93 None
94 }
95
96 pub fn update_schema(&mut self, prost: &PbSchema) {
97 let id = prost.id;
98 let name = prost.name.clone();
99
100 let old_schema_name = self.schema_name_by_id.get(&id).unwrap().to_owned();
101 if old_schema_name != name {
102 let mut schema = self.schema_by_name.remove(&old_schema_name).unwrap();
103 schema.name.clone_from(&name);
104 schema.database_id = prost.database_id;
105 schema.owner = prost.owner;
106 self.schema_by_name.insert(name.clone(), schema);
107 self.schema_name_by_id.insert(id, name);
108 } else {
109 let schema = self.get_schema_mut(id).unwrap();
110 schema.name.clone_from(&name);
111 schema.database_id = prost.database_id;
112 schema.owner = prost.owner;
113 };
114 }
115
116 pub fn id(&self) -> DatabaseId {
117 self.id
118 }
119
120 pub fn name(&self) -> &str {
121 &self.name
122 }
123
124 pub fn to_prost(&self) -> PbDatabase {
125 PbDatabase {
126 id: self.id,
127 name: self.name.clone(),
128 owner: self.owner,
129 resource_group: self.resource_group.clone(),
130 barrier_interval_ms: self.barrier_interval_ms,
131 checkpoint_frequency: self.checkpoint_frequency,
132 }
133 }
134}
135
136impl OwnedByUserCatalog for DatabaseCatalog {
137 fn owner(&self) -> UserId {
138 self.owner
139 }
140}
141
142impl From<&PbDatabase> for DatabaseCatalog {
143 fn from(db: &PbDatabase) -> Self {
144 Self {
145 id: db.id,
146 name: db.name.clone(),
147 schema_by_name: HashMap::new(),
148 schema_name_by_id: HashMap::new(),
149 owner: db.owner,
150 resource_group: db.resource_group.clone(),
151 barrier_interval_ms: db.barrier_interval_ms,
152 checkpoint_frequency: db.checkpoint_frequency,
153 }
154 }
155}