risingwave_frontend/catalog/
database_catalog.rs1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::id::ObjectId;
19use risingwave_pb::catalog::{PbDatabase, PbSchema};
20
21use super::{OwnedByUserCatalog, OwnedGrantObject};
22use crate::catalog::schema_catalog::SchemaCatalog;
23use crate::catalog::{DatabaseId, SchemaId, TableId};
24use crate::user::UserId;
25
26#[derive(Clone, Debug)]
27pub struct DatabaseCatalog {
28 id: DatabaseId,
29 pub name: String,
30 schema_by_name: HashMap<String, SchemaCatalog>,
31 schema_name_by_id: HashMap<SchemaId, String>,
32 pub owner: UserId,
33 pub resource_group: String,
34 pub barrier_interval_ms: Option<u32>,
35 pub checkpoint_frequency: Option<u64>,
36}
37
38impl DatabaseCatalog {
39 pub fn create_schema(&mut self, proto: &PbSchema) {
40 let name = proto.name.clone();
41 let id = proto.id;
42 let schema = proto.into();
43 self.schema_by_name
44 .try_insert(name.clone(), schema)
45 .unwrap();
46 self.schema_name_by_id.try_insert(id, name).unwrap();
47 }
48
49 pub fn drop_schema(&mut self, schema_id: SchemaId) {
50 let name = self.schema_name_by_id.remove(&schema_id).unwrap();
51 self.schema_by_name.remove(&name).unwrap();
52 }
53
54 pub fn get_all_schema_names(&self) -> Vec<String> {
55 self.schema_by_name.keys().cloned().collect_vec()
56 }
57
58 pub fn iter_all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
59 self.schema_by_name
60 .values()
61 .flat_map(|schema| schema.iter_all().map(|t| t.id()))
62 }
63
64 pub fn iter_object_ids(&self) -> impl Iterator<Item = ObjectId> + '_ {
65 self.schema_by_name
66 .values()
67 .flat_map(|schema| schema.iter_object_ids())
68 }
69
70 pub fn iter_schemas(&self) -> impl Iterator<Item = &SchemaCatalog> {
71 self.schema_by_name.values()
72 }
73
74 pub fn iter_schemas_mut(&mut self) -> impl Iterator<Item = &mut SchemaCatalog> {
75 self.schema_by_name.values_mut()
76 }
77
78 pub fn get_schema_by_name(&self, name: &str) -> Option<&SchemaCatalog> {
79 self.schema_by_name.get(name)
80 }
81
82 pub fn get_schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaCatalog> {
83 self.schema_by_name
84 .get(self.schema_name_by_id.get(&schema_id)?)
85 }
86
87 pub fn get_schema_mut(&mut self, schema_id: SchemaId) -> Option<&mut SchemaCatalog> {
88 let name = self.schema_name_by_id.get(&schema_id).unwrap();
89 self.schema_by_name.get_mut(name)
90 }
91
92 pub fn get_grant_object_by_oid(&self, oid: ObjectId) -> Option<OwnedGrantObject> {
93 for schema in self.schema_by_name.values() {
94 let object = schema.get_grant_object_by_oid(oid);
95 if object.is_some() {
96 return object;
97 }
98 }
99 None
100 }
101
102 pub fn update_schema(&mut self, prost: &PbSchema) {
103 let id = prost.id;
104 let name = prost.name.clone();
105
106 let old_schema_name = self.schema_name_by_id.get(&id).unwrap().to_owned();
107 if old_schema_name != name {
108 let mut schema = self.schema_by_name.remove(&old_schema_name).unwrap();
109 schema.name.clone_from(&name);
110 schema.database_id = prost.database_id;
111 schema.owner = prost.owner;
112 self.schema_by_name.insert(name.clone(), schema);
113 self.schema_name_by_id.insert(id, name);
114 } else {
115 let schema = self.get_schema_mut(id).unwrap();
116 schema.name.clone_from(&name);
117 schema.database_id = prost.database_id;
118 schema.owner = prost.owner;
119 };
120 }
121
122 pub fn id(&self) -> DatabaseId {
123 self.id
124 }
125
126 pub fn name(&self) -> &str {
127 &self.name
128 }
129
130 pub fn to_prost(&self) -> PbDatabase {
131 PbDatabase {
132 id: self.id,
133 name: self.name.clone(),
134 owner: self.owner,
135 resource_group: self.resource_group.clone(),
136 barrier_interval_ms: self.barrier_interval_ms,
137 checkpoint_frequency: self.checkpoint_frequency,
138 }
139 }
140}
141
142impl OwnedByUserCatalog for DatabaseCatalog {
143 fn owner(&self) -> UserId {
144 self.owner
145 }
146}
147
148impl From<&PbDatabase> for DatabaseCatalog {
149 fn from(db: &PbDatabase) -> Self {
150 Self {
151 id: db.id,
152 name: db.name.clone(),
153 schema_by_name: HashMap::new(),
154 schema_name_by_id: HashMap::new(),
155 owner: db.owner,
156 resource_group: db.resource_group.clone(),
157 barrier_interval_ms: db.barrier_interval_ms,
158 checkpoint_frequency: db.checkpoint_frequency,
159 }
160 }
161}