risingwave_frontend/catalog/
database_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_pb::catalog::{PbDatabase, PbSchema};
19
20use super::{OwnedByUserCatalog, OwnedGrantObject};
21use crate::catalog::schema_catalog::SchemaCatalog;
22use crate::catalog::{DatabaseId, SchemaId, TableId};
23use crate::user::UserId;
24
25#[derive(Clone, Debug)]
26pub struct DatabaseCatalog {
27    id: DatabaseId,
28    pub name: String,
29    schema_by_name: HashMap<String, SchemaCatalog>,
30    schema_name_by_id: HashMap<SchemaId, String>,
31    pub owner: u32,
32    pub resource_group: String,
33    pub barrier_interval_ms: Option<u32>,
34    pub checkpoint_frequency: Option<u64>,
35}
36
37impl DatabaseCatalog {
38    pub fn create_schema(&mut self, proto: &PbSchema) {
39        let name = proto.name.clone();
40        let id = proto.id;
41        let schema = proto.into();
42        self.schema_by_name
43            .try_insert(name.clone(), schema)
44            .unwrap();
45        self.schema_name_by_id.try_insert(id, name).unwrap();
46    }
47
48    pub fn drop_schema(&mut self, schema_id: SchemaId) {
49        let name = self.schema_name_by_id.remove(&schema_id).unwrap();
50        self.schema_by_name.remove(&name).unwrap();
51    }
52
53    pub fn get_all_schema_names(&self) -> Vec<String> {
54        self.schema_by_name.keys().cloned().collect_vec()
55    }
56
57    pub fn iter_all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
58        self.schema_by_name
59            .values()
60            .flat_map(|schema| schema.iter_all().map(|t| t.id()))
61    }
62
63    pub fn iter_schemas(&self) -> impl Iterator<Item = &SchemaCatalog> {
64        self.schema_by_name.values()
65    }
66
67    pub fn iter_schemas_mut(&mut self) -> impl Iterator<Item = &mut SchemaCatalog> {
68        self.schema_by_name.values_mut()
69    }
70
71    pub fn get_schema_by_name(&self, name: &str) -> Option<&SchemaCatalog> {
72        self.schema_by_name.get(name)
73    }
74
75    pub fn get_schema_by_id(&self, schema_id: &SchemaId) -> Option<&SchemaCatalog> {
76        self.schema_by_name
77            .get(self.schema_name_by_id.get(schema_id)?)
78    }
79
80    pub fn get_schema_mut(&mut self, schema_id: SchemaId) -> Option<&mut SchemaCatalog> {
81        let name = self.schema_name_by_id.get(&schema_id).unwrap();
82        self.schema_by_name.get_mut(name)
83    }
84
85    pub fn get_grant_object_by_oid(&self, oid: u32) -> Option<OwnedGrantObject> {
86        for schema in self.schema_by_name.values() {
87            let object = schema.get_grant_object_by_oid(oid);
88            if object.is_some() {
89                return object;
90            }
91        }
92        None
93    }
94
95    pub fn update_schema(&mut self, prost: &PbSchema) {
96        let id = prost.id;
97        let name = prost.name.clone();
98
99        let old_schema_name = self.schema_name_by_id.get(&id).unwrap().to_owned();
100        if old_schema_name != name {
101            let mut schema = self.schema_by_name.remove(&old_schema_name).unwrap();
102            schema.name.clone_from(&name);
103            schema.database_id = prost.database_id;
104            schema.owner = prost.owner;
105            self.schema_by_name.insert(name.clone(), schema);
106            self.schema_name_by_id.insert(id, name);
107        } else {
108            let schema = self.get_schema_mut(id).unwrap();
109            schema.name.clone_from(&name);
110            schema.database_id = prost.database_id;
111            schema.owner = prost.owner;
112        };
113    }
114
115    pub fn id(&self) -> DatabaseId {
116        self.id
117    }
118
119    pub fn name(&self) -> &str {
120        &self.name
121    }
122
123    pub fn to_prost(&self) -> PbDatabase {
124        PbDatabase {
125            id: self.id,
126            name: self.name.clone(),
127            owner: self.owner,
128            resource_group: self.resource_group.clone(),
129            barrier_interval_ms: self.barrier_interval_ms,
130            checkpoint_frequency: self.checkpoint_frequency,
131        }
132    }
133}
134
135impl OwnedByUserCatalog for DatabaseCatalog {
136    fn owner(&self) -> UserId {
137        self.owner
138    }
139}
140
141impl From<&PbDatabase> for DatabaseCatalog {
142    fn from(db: &PbDatabase) -> Self {
143        Self {
144            id: db.id,
145            name: db.name.clone(),
146            schema_by_name: HashMap::new(),
147            schema_name_by_id: HashMap::new(),
148            owner: db.owner,
149            resource_group: db.resource_group.clone(),
150            barrier_interval_ms: db.barrier_interval_ms,
151            checkpoint_frequency: db.checkpoint_frequency,
152        }
153    }
154}