risingwave_meta_service/
user_service.rs1use itertools::Itertools;
16use risingwave_meta::manager::MetadataManager;
17use risingwave_meta_model::{SchemaId, UserId};
18use risingwave_pb::user::alter_default_privilege_request::Operation;
19use risingwave_pb::user::update_user_request::UpdateField;
20use risingwave_pb::user::user_service_server::UserService;
21use risingwave_pb::user::{
22 AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse, CreateUserRequest,
23 CreateUserResponse, DropUserRequest, DropUserResponse, GrantPrivilegeRequest,
24 GrantPrivilegeResponse, RevokePrivilegeRequest, RevokePrivilegeResponse, UpdateUserRequest,
25 UpdateUserResponse,
26};
27use tonic::{Request, Response, Status};
28
29pub struct UserServiceImpl {
30 metadata_manager: MetadataManager,
31}
32
33impl UserServiceImpl {
34 pub fn new(metadata_manager: MetadataManager) -> Self {
35 Self { metadata_manager }
36 }
37}
38
39#[async_trait::async_trait]
40impl UserService for UserServiceImpl {
41 async fn create_user(
42 &self,
43 request: Request<CreateUserRequest>,
44 ) -> Result<Response<CreateUserResponse>, Status> {
45 let req = request.into_inner();
46 let version = self
47 .metadata_manager
48 .catalog_controller
49 .create_user(req.get_user()?.clone())
50 .await?;
51
52 Ok(Response::new(CreateUserResponse {
53 status: None,
54 version,
55 }))
56 }
57
58 async fn drop_user(
59 &self,
60 request: Request<DropUserRequest>,
61 ) -> Result<Response<DropUserResponse>, Status> {
62 let req = request.into_inner();
63 let version = self
64 .metadata_manager
65 .catalog_controller
66 .drop_user(req.user_id as _)
67 .await?;
68
69 Ok(Response::new(DropUserResponse {
70 status: None,
71 version,
72 }))
73 }
74
75 async fn update_user(
76 &self,
77 request: Request<UpdateUserRequest>,
78 ) -> Result<Response<UpdateUserResponse>, Status> {
79 let req = request.into_inner();
80 let update_fields = req
81 .update_fields
82 .iter()
83 .map(|i| UpdateField::try_from(*i).unwrap())
84 .collect_vec();
85 let user = req.get_user()?.clone();
86
87 let version = self
88 .metadata_manager
89 .catalog_controller
90 .update_user(user, &update_fields)
91 .await?;
92
93 Ok(Response::new(UpdateUserResponse {
94 status: None,
95 version,
96 }))
97 }
98
99 async fn grant_privilege(
100 &self,
101 request: Request<GrantPrivilegeRequest>,
102 ) -> Result<Response<GrantPrivilegeResponse>, Status> {
103 let req = request.into_inner();
104 let user_ids: Vec<_> = req.get_user_ids().iter().map(|id| *id as UserId).collect();
105 let version = self
106 .metadata_manager
107 .catalog_controller
108 .grant_privilege(
109 user_ids,
110 req.get_privileges(),
111 req.granted_by as _,
112 req.with_grant_option,
113 )
114 .await?;
115
116 Ok(Response::new(GrantPrivilegeResponse {
117 status: None,
118 version,
119 }))
120 }
121
122 async fn revoke_privilege(
123 &self,
124 request: Request<RevokePrivilegeRequest>,
125 ) -> Result<Response<RevokePrivilegeResponse>, Status> {
126 let req = request.into_inner();
127 let user_ids: Vec<_> = req.get_user_ids().iter().map(|id| *id as UserId).collect();
128 let version = self
129 .metadata_manager
130 .catalog_controller
131 .revoke_privilege(
132 user_ids,
133 req.get_privileges(),
134 req.granted_by as _,
135 req.revoke_by as _,
136 req.revoke_grant_option,
137 req.cascade,
138 )
139 .await?;
140
141 Ok(Response::new(RevokePrivilegeResponse {
142 status: None,
143 version,
144 }))
145 }
146
147 async fn alter_default_privilege(
148 &self,
149 request: Request<AlterDefaultPrivilegeRequest>,
150 ) -> Result<Response<AlterDefaultPrivilegeResponse>, Status> {
151 let req = request.into_inner();
152 let operation = req.get_operation()?;
153 let user_ids: Vec<_> = req.get_user_ids().iter().map(|id| *id as UserId).collect();
154 let schema_ids: Vec<_> = req.schema_ids.iter().map(|id| *id as SchemaId).collect();
155 match operation {
156 Operation::GrantPrivilege(grant_privilege) => {
157 self.metadata_manager
158 .catalog_controller
159 .grant_default_privileges(
160 user_ids,
161 req.database_id as _,
162 schema_ids,
163 req.granted_by as _,
164 grant_privilege.actions().collect(),
165 grant_privilege.get_object_type()?,
166 grant_privilege
167 .grantees
168 .iter()
169 .map(|id| *id as UserId)
170 .collect(),
171 grant_privilege.with_grant_option,
172 )
173 .await?
174 }
175 Operation::RevokePrivilege(revoke_privilege) => {
176 self.metadata_manager
177 .catalog_controller
178 .revoke_default_privileges(
179 user_ids,
180 req.database_id as _,
181 schema_ids,
182 revoke_privilege.actions().collect(),
183 revoke_privilege.get_object_type()?,
184 revoke_privilege
185 .grantees
186 .iter()
187 .map(|id| *id as UserId)
188 .collect(),
189 revoke_privilege.revoke_grant_option,
190 )
191 .await?
192 }
193 }
194
195 Ok(Response::new(AlterDefaultPrivilegeResponse {
196 status: None,
197 }))
198 }
199}