risingwave_frontend/catalog/
database_catalog.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::id::ObjectId;
19use risingwave_pb::catalog::{PbDatabase, PbSchema};
20
21use super::{OwnedByUserCatalog, OwnedGrantObject};
22use crate::catalog::schema_catalog::SchemaCatalog;
23use crate::catalog::{DatabaseId, SchemaId, TableId};
24use crate::user::UserId;
25
26#[derive(Clone, Debug)]
27pub struct DatabaseCatalog {
28    id: DatabaseId,
29    pub name: String,
30    schema_by_name: HashMap<String, SchemaCatalog>,
31    schema_name_by_id: HashMap<SchemaId, String>,
32    pub owner: UserId,
33    pub resource_group: String,
34    pub barrier_interval_ms: Option<u32>,
35    pub checkpoint_frequency: Option<u64>,
36}
37
38impl DatabaseCatalog {
39    pub fn create_schema(&mut self, proto: &PbSchema) {
40        let name = proto.name.clone();
41        let id = proto.id;
42        let schema = proto.into();
43        self.schema_by_name
44            .try_insert(name.clone(), schema)
45            .unwrap();
46        self.schema_name_by_id.try_insert(id, name).unwrap();
47    }
48
49    pub fn drop_schema(&mut self, schema_id: SchemaId) {
50        let name = self.schema_name_by_id.remove(&schema_id).unwrap();
51        self.schema_by_name.remove(&name).unwrap();
52    }
53
54    pub fn get_all_schema_names(&self) -> Vec<String> {
55        self.schema_by_name.keys().cloned().collect_vec()
56    }
57
58    pub fn iter_all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
59        self.schema_by_name
60            .values()
61            .flat_map(|schema| schema.iter_all().map(|t| t.id()))
62    }
63
64    pub fn iter_object_ids(&self) -> impl Iterator<Item = ObjectId> + '_ {
65        self.schema_by_name
66            .values()
67            .flat_map(|schema| schema.iter_object_ids())
68    }
69
70    pub fn iter_schemas(&self) -> impl Iterator<Item = &SchemaCatalog> {
71        self.schema_by_name.values()
72    }
73
74    pub fn iter_schemas_mut(&mut self) -> impl Iterator<Item = &mut SchemaCatalog> {
75        self.schema_by_name.values_mut()
76    }
77
78    pub fn get_schema_by_name(&self, name: &str) -> Option<&SchemaCatalog> {
79        self.schema_by_name.get(name)
80    }
81
82    pub fn get_schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaCatalog> {
83        self.schema_by_name
84            .get(self.schema_name_by_id.get(&schema_id)?)
85    }
86
87    pub fn get_schema_mut(&mut self, schema_id: SchemaId) -> Option<&mut SchemaCatalog> {
88        let name = self.schema_name_by_id.get(&schema_id).unwrap();
89        self.schema_by_name.get_mut(name)
90    }
91
92    pub fn get_grant_object_by_oid(&self, oid: ObjectId) -> Option<OwnedGrantObject> {
93        for schema in self.schema_by_name.values() {
94            let object = schema.get_grant_object_by_oid(oid);
95            if object.is_some() {
96                return object;
97            }
98        }
99        None
100    }
101
102    pub fn update_schema(&mut self, prost: &PbSchema) {
103        let id = prost.id;
104        let name = prost.name.clone();
105
106        let old_schema_name = self.schema_name_by_id.get(&id).unwrap().to_owned();
107        if old_schema_name != name {
108            let mut schema = self.schema_by_name.remove(&old_schema_name).unwrap();
109            schema.name.clone_from(&name);
110            schema.database_id = prost.database_id;
111            schema.owner = prost.owner;
112            self.schema_by_name.insert(name.clone(), schema);
113            self.schema_name_by_id.insert(id, name);
114        } else {
115            let schema = self.get_schema_mut(id).unwrap();
116            schema.name.clone_from(&name);
117            schema.database_id = prost.database_id;
118            schema.owner = prost.owner;
119        };
120    }
121
122    pub fn id(&self) -> DatabaseId {
123        self.id
124    }
125
126    pub fn name(&self) -> &str {
127        &self.name
128    }
129
130    pub fn to_prost(&self) -> PbDatabase {
131        PbDatabase {
132            id: self.id,
133            name: self.name.clone(),
134            owner: self.owner,
135            resource_group: self.resource_group.clone(),
136            barrier_interval_ms: self.barrier_interval_ms,
137            checkpoint_frequency: self.checkpoint_frequency,
138        }
139    }
140}
141
142impl OwnedByUserCatalog for DatabaseCatalog {
143    fn owner(&self) -> UserId {
144        self.owner
145    }
146}
147
148impl From<&PbDatabase> for DatabaseCatalog {
149    fn from(db: &PbDatabase) -> Self {
150        Self {
151            id: db.id,
152            name: db.name.clone(),
153            schema_by_name: HashMap::new(),
154            schema_name_by_id: HashMap::new(),
155            owner: db.owner,
156            resource_group: db.resource_group.clone(),
157            barrier_interval_ms: db.barrier_interval_ms,
158            checkpoint_frequency: db.checkpoint_frequency,
159        }
160    }
161}