risingwave_meta_service/
user_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_meta::manager::MetadataManager;
17use risingwave_meta_model::{SchemaId, UserId};
18use risingwave_pb::user::alter_default_privilege_request::Operation;
19use risingwave_pb::user::update_user_request::UpdateField;
20use risingwave_pb::user::user_service_server::UserService;
21use risingwave_pb::user::{
22    AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse, CreateUserRequest,
23    CreateUserResponse, DropUserRequest, DropUserResponse, GrantPrivilegeRequest,
24    GrantPrivilegeResponse, RevokePrivilegeRequest, RevokePrivilegeResponse, UpdateUserRequest,
25    UpdateUserResponse,
26};
27use tonic::{Request, Response, Status};
28
29pub struct UserServiceImpl {
30    metadata_manager: MetadataManager,
31}
32
33impl UserServiceImpl {
34    pub fn new(metadata_manager: MetadataManager) -> Self {
35        Self { metadata_manager }
36    }
37}
38
39#[async_trait::async_trait]
40impl UserService for UserServiceImpl {
41    async fn create_user(
42        &self,
43        request: Request<CreateUserRequest>,
44    ) -> Result<Response<CreateUserResponse>, Status> {
45        let req = request.into_inner();
46        let version = self
47            .metadata_manager
48            .catalog_controller
49            .create_user(req.get_user()?.clone())
50            .await?;
51
52        Ok(Response::new(CreateUserResponse {
53            status: None,
54            version,
55        }))
56    }
57
58    async fn drop_user(
59        &self,
60        request: Request<DropUserRequest>,
61    ) -> Result<Response<DropUserResponse>, Status> {
62        let req = request.into_inner();
63        let version = self
64            .metadata_manager
65            .catalog_controller
66            .drop_user(req.user_id as _)
67            .await?;
68
69        Ok(Response::new(DropUserResponse {
70            status: None,
71            version,
72        }))
73    }
74
75    async fn update_user(
76        &self,
77        request: Request<UpdateUserRequest>,
78    ) -> Result<Response<UpdateUserResponse>, Status> {
79        let req = request.into_inner();
80        let update_fields = req
81            .update_fields
82            .iter()
83            .map(|i| UpdateField::try_from(*i).unwrap())
84            .collect_vec();
85        let user = req.get_user()?.clone();
86
87        let version = self
88            .metadata_manager
89            .catalog_controller
90            .update_user(user, &update_fields)
91            .await?;
92
93        Ok(Response::new(UpdateUserResponse {
94            status: None,
95            version,
96        }))
97    }
98
99    async fn grant_privilege(
100        &self,
101        request: Request<GrantPrivilegeRequest>,
102    ) -> Result<Response<GrantPrivilegeResponse>, Status> {
103        let req = request.into_inner();
104        let user_ids: Vec<_> = req.get_user_ids().iter().map(|id| *id as UserId).collect();
105        let version = self
106            .metadata_manager
107            .catalog_controller
108            .grant_privilege(
109                user_ids,
110                req.get_privileges(),
111                req.granted_by as _,
112                req.with_grant_option,
113            )
114            .await?;
115
116        Ok(Response::new(GrantPrivilegeResponse {
117            status: None,
118            version,
119        }))
120    }
121
122    async fn revoke_privilege(
123        &self,
124        request: Request<RevokePrivilegeRequest>,
125    ) -> Result<Response<RevokePrivilegeResponse>, Status> {
126        let req = request.into_inner();
127        let user_ids: Vec<_> = req.get_user_ids().iter().map(|id| *id as UserId).collect();
128        let version = self
129            .metadata_manager
130            .catalog_controller
131            .revoke_privilege(
132                user_ids,
133                req.get_privileges(),
134                req.granted_by as _,
135                req.revoke_by as _,
136                req.revoke_grant_option,
137                req.cascade,
138            )
139            .await?;
140
141        Ok(Response::new(RevokePrivilegeResponse {
142            status: None,
143            version,
144        }))
145    }
146
147    async fn alter_default_privilege(
148        &self,
149        request: Request<AlterDefaultPrivilegeRequest>,
150    ) -> Result<Response<AlterDefaultPrivilegeResponse>, Status> {
151        let req = request.into_inner();
152        let operation = req.get_operation()?;
153        let user_ids: Vec<_> = req.get_user_ids().iter().map(|id| *id as UserId).collect();
154        let schema_ids: Vec<_> = req.schema_ids.iter().map(|id| *id as SchemaId).collect();
155        match operation {
156            Operation::GrantPrivilege(grant_privilege) => {
157                self.metadata_manager
158                    .catalog_controller
159                    .grant_default_privileges(
160                        user_ids,
161                        req.database_id as _,
162                        schema_ids,
163                        req.granted_by as _,
164                        grant_privilege.actions().collect(),
165                        grant_privilege.get_object_type()?,
166                        grant_privilege
167                            .grantees
168                            .iter()
169                            .map(|id| *id as UserId)
170                            .collect(),
171                        grant_privilege.with_grant_option,
172                    )
173                    .await?
174            }
175            Operation::RevokePrivilege(revoke_privilege) => {
176                self.metadata_manager
177                    .catalog_controller
178                    .revoke_default_privileges(
179                        user_ids,
180                        req.database_id as _,
181                        schema_ids,
182                        revoke_privilege.actions().collect(),
183                        revoke_privilege.get_object_type()?,
184                        revoke_privilege
185                            .grantees
186                            .iter()
187                            .map(|id| *id as UserId)
188                            .collect(),
189                        revoke_privilege.revoke_grant_option,
190                    )
191                    .await?
192            }
193        }
194
195        Ok(Response::new(AlterDefaultPrivilegeResponse {
196            status: None,
197        }))
198    }
199}