risingwave_meta_service/
user_service.rs1use itertools::Itertools;
16use risingwave_meta::manager::MetadataManager;
17use risingwave_meta_model::UserId;
18use risingwave_pb::user::update_user_request::UpdateField;
19use risingwave_pb::user::user_service_server::UserService;
20use risingwave_pb::user::{
21 CreateUserRequest, CreateUserResponse, DropUserRequest, DropUserResponse,
22 GrantPrivilegeRequest, GrantPrivilegeResponse, RevokePrivilegeRequest, RevokePrivilegeResponse,
23 UpdateUserRequest, UpdateUserResponse,
24};
25use tonic::{Request, Response, Status};
26
27pub struct UserServiceImpl {
28 metadata_manager: MetadataManager,
29}
30
31impl UserServiceImpl {
32 pub fn new(metadata_manager: MetadataManager) -> Self {
33 Self { metadata_manager }
34 }
35}
36
37#[async_trait::async_trait]
38impl UserService for UserServiceImpl {
39 #[cfg_attr(coverage, coverage(off))]
40 async fn create_user(
41 &self,
42 request: Request<CreateUserRequest>,
43 ) -> Result<Response<CreateUserResponse>, Status> {
44 let req = request.into_inner();
45 let version = self
46 .metadata_manager
47 .catalog_controller
48 .create_user(req.get_user()?.clone())
49 .await?;
50
51 Ok(Response::new(CreateUserResponse {
52 status: None,
53 version,
54 }))
55 }
56
57 #[cfg_attr(coverage, coverage(off))]
58 async fn drop_user(
59 &self,
60 request: Request<DropUserRequest>,
61 ) -> Result<Response<DropUserResponse>, Status> {
62 let req = request.into_inner();
63 let version = self
64 .metadata_manager
65 .catalog_controller
66 .drop_user(req.user_id as _)
67 .await?;
68
69 Ok(Response::new(DropUserResponse {
70 status: None,
71 version,
72 }))
73 }
74
75 #[cfg_attr(coverage, coverage(off))]
76 async fn update_user(
77 &self,
78 request: Request<UpdateUserRequest>,
79 ) -> Result<Response<UpdateUserResponse>, Status> {
80 let req = request.into_inner();
81 let update_fields = req
82 .update_fields
83 .iter()
84 .map(|i| UpdateField::try_from(*i).unwrap())
85 .collect_vec();
86 let user = req.get_user()?.clone();
87
88 let version = self
89 .metadata_manager
90 .catalog_controller
91 .update_user(user, &update_fields)
92 .await?;
93
94 Ok(Response::new(UpdateUserResponse {
95 status: None,
96 version,
97 }))
98 }
99
100 #[cfg_attr(coverage, coverage(off))]
101 async fn grant_privilege(
102 &self,
103 request: Request<GrantPrivilegeRequest>,
104 ) -> Result<Response<GrantPrivilegeResponse>, Status> {
105 let req = request.into_inner();
106 let user_ids: Vec<_> = req.get_user_ids().iter().map(|id| *id as UserId).collect();
107 let version = self
108 .metadata_manager
109 .catalog_controller
110 .grant_privilege(
111 user_ids,
112 req.get_privileges(),
113 req.granted_by as _,
114 req.with_grant_option,
115 )
116 .await?;
117
118 Ok(Response::new(GrantPrivilegeResponse {
119 status: None,
120 version,
121 }))
122 }
123
124 #[cfg_attr(coverage, coverage(off))]
125 async fn revoke_privilege(
126 &self,
127 request: Request<RevokePrivilegeRequest>,
128 ) -> Result<Response<RevokePrivilegeResponse>, Status> {
129 let req = request.into_inner();
130 let user_ids: Vec<_> = req.get_user_ids().iter().map(|id| *id as UserId).collect();
131 let version = self
132 .metadata_manager
133 .catalog_controller
134 .revoke_privilege(
135 user_ids,
136 req.get_privileges(),
137 req.granted_by as _,
138 req.revoke_by as _,
139 req.revoke_grant_option,
140 req.cascade,
141 )
142 .await?;
143
144 Ok(Response::new(RevokePrivilegeResponse {
145 status: None,
146 version,
147 }))
148 }
149}