risingwave_meta_service/
user_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_meta::manager::MetadataManager;
17use risingwave_meta_model::UserId;
18use risingwave_pb::user::update_user_request::UpdateField;
19use risingwave_pb::user::user_service_server::UserService;
20use risingwave_pb::user::{
21    CreateUserRequest, CreateUserResponse, DropUserRequest, DropUserResponse,
22    GrantPrivilegeRequest, GrantPrivilegeResponse, RevokePrivilegeRequest, RevokePrivilegeResponse,
23    UpdateUserRequest, UpdateUserResponse,
24};
25use tonic::{Request, Response, Status};
26
27pub struct UserServiceImpl {
28    metadata_manager: MetadataManager,
29}
30
31impl UserServiceImpl {
32    pub fn new(metadata_manager: MetadataManager) -> Self {
33        Self { metadata_manager }
34    }
35}
36
37#[async_trait::async_trait]
38impl UserService for UserServiceImpl {
39    #[cfg_attr(coverage, coverage(off))]
40    async fn create_user(
41        &self,
42        request: Request<CreateUserRequest>,
43    ) -> Result<Response<CreateUserResponse>, Status> {
44        let req = request.into_inner();
45        let version = self
46            .metadata_manager
47            .catalog_controller
48            .create_user(req.get_user()?.clone())
49            .await?;
50
51        Ok(Response::new(CreateUserResponse {
52            status: None,
53            version,
54        }))
55    }
56
57    #[cfg_attr(coverage, coverage(off))]
58    async fn drop_user(
59        &self,
60        request: Request<DropUserRequest>,
61    ) -> Result<Response<DropUserResponse>, Status> {
62        let req = request.into_inner();
63        let version = self
64            .metadata_manager
65            .catalog_controller
66            .drop_user(req.user_id as _)
67            .await?;
68
69        Ok(Response::new(DropUserResponse {
70            status: None,
71            version,
72        }))
73    }
74
75    #[cfg_attr(coverage, coverage(off))]
76    async fn update_user(
77        &self,
78        request: Request<UpdateUserRequest>,
79    ) -> Result<Response<UpdateUserResponse>, Status> {
80        let req = request.into_inner();
81        let update_fields = req
82            .update_fields
83            .iter()
84            .map(|i| UpdateField::try_from(*i).unwrap())
85            .collect_vec();
86        let user = req.get_user()?.clone();
87
88        let version = self
89            .metadata_manager
90            .catalog_controller
91            .update_user(user, &update_fields)
92            .await?;
93
94        Ok(Response::new(UpdateUserResponse {
95            status: None,
96            version,
97        }))
98    }
99
100    #[cfg_attr(coverage, coverage(off))]
101    async fn grant_privilege(
102        &self,
103        request: Request<GrantPrivilegeRequest>,
104    ) -> Result<Response<GrantPrivilegeResponse>, Status> {
105        let req = request.into_inner();
106        let user_ids: Vec<_> = req.get_user_ids().iter().map(|id| *id as UserId).collect();
107        let version = self
108            .metadata_manager
109            .catalog_controller
110            .grant_privilege(
111                user_ids,
112                req.get_privileges(),
113                req.granted_by as _,
114                req.with_grant_option,
115            )
116            .await?;
117
118        Ok(Response::new(GrantPrivilegeResponse {
119            status: None,
120            version,
121        }))
122    }
123
124    #[cfg_attr(coverage, coverage(off))]
125    async fn revoke_privilege(
126        &self,
127        request: Request<RevokePrivilegeRequest>,
128    ) -> Result<Response<RevokePrivilegeResponse>, Status> {
129        let req = request.into_inner();
130        let user_ids: Vec<_> = req.get_user_ids().iter().map(|id| *id as UserId).collect();
131        let version = self
132            .metadata_manager
133            .catalog_controller
134            .revoke_privilege(
135                user_ids,
136                req.get_privileges(),
137                req.granted_by as _,
138                req.revoke_by as _,
139                req.revoke_grant_option,
140                req.cascade,
141            )
142            .await?;
143
144        Ok(Response::new(RevokePrivilegeResponse {
145            status: None,
146            version,
147        }))
148    }
149}