risingwave_frontend/handler/
alter_user.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::catalog::{DEFAULT_SUPER_USER_FOR_ADMIN, is_reserved_admin_user};
17use risingwave_pb::user::UserInfo;
18use risingwave_pb::user::update_user_request::UpdateField;
19use risingwave_sqlparser::ast::{AlterUserStatement, ObjectName, UserOption, UserOptions};
20
21use super::RwPgResponse;
22use crate::binder::Binder;
23use crate::catalog::CatalogError;
24use crate::error::ErrorCode::{self, InternalError, PermissionDenied};
25use crate::error::Result;
26use crate::handler::HandlerArgs;
27use crate::user::user_authentication::{
28 OAUTH_ISSUER_KEY, OAUTH_JWKS_URL_KEY, build_oauth_info, encrypted_password,
29};
30use crate::user::user_catalog::UserCatalog;
31
32fn alter_prost_user_info(
33 mut user_info: UserInfo,
34 options: &UserOptions,
35 session_user: &UserCatalog,
36) -> Result<(UserInfo, Vec<UpdateField>, Option<String>)> {
37 let change_self_password_only = session_user.id == user_info.id
38 && options.0.len() == 1
39 && matches!(
40 &options.0[0],
41 UserOption::EncryptedPassword(_) | UserOption::Password(_) | UserOption::OAuth(_)
42 );
43
44 if !change_self_password_only && is_reserved_admin_user(&user_info.name) {
45 return Err(PermissionDenied(
47 format!("{} cannot be altered", DEFAULT_SUPER_USER_FOR_ADMIN).to_owned(),
48 )
49 .into());
50 }
51
52 if !session_user.is_super {
53 let require_super = user_info.is_super
54 || options
55 .0
56 .iter()
57 .any(|option| matches!(option, UserOption::SuperUser | UserOption::NoSuperUser));
58 if require_super {
59 return Err(PermissionDenied(
60 "must be superuser to alter superuser roles or change superuser attribute"
61 .to_owned(),
62 )
63 .into());
64 }
65 if !session_user.can_create_user && !change_self_password_only {
66 return Err(PermissionDenied("permission denied to alter user".to_owned()).into());
67 }
68 }
69
70 let mut update_fields = Vec::new();
71 let mut notice = None;
72 for option in &options.0 {
73 match option {
74 UserOption::SuperUser => {
75 user_info.is_super = true;
76 update_fields.push(UpdateField::Super);
77 }
78 UserOption::NoSuperUser => {
79 user_info.is_super = false;
80 update_fields.push(UpdateField::Super);
81 }
82 UserOption::CreateDB => {
83 user_info.can_create_db = true;
84 update_fields.push(UpdateField::CreateDb);
85 }
86 UserOption::NoCreateDB => {
87 user_info.can_create_db = false;
88 update_fields.push(UpdateField::CreateDb);
89 }
90 UserOption::CreateUser => {
91 user_info.can_create_user = true;
92 update_fields.push(UpdateField::CreateUser);
93 }
94 UserOption::NoCreateUser => {
95 user_info.can_create_user = false;
96 update_fields.push(UpdateField::CreateUser);
97 }
98 UserOption::Login => {
99 user_info.can_login = true;
100 update_fields.push(UpdateField::Login);
101 }
102 UserOption::NoLogin => {
103 user_info.can_login = false;
104 update_fields.push(UpdateField::Login);
105 }
106 UserOption::EncryptedPassword(p) => {
107 if !p.0.is_empty() {
108 user_info.auth_info = encrypted_password(&user_info.name, &p.0);
109 } else {
110 user_info.auth_info = None;
111 notice =
112 Some("empty string is not a valid password, clearing password".to_owned());
113 };
114 update_fields.push(UpdateField::AuthInfo);
115 }
116 UserOption::Password(opt) => {
117 if let Some(password) = opt
118 && !password.0.is_empty()
119 {
120 user_info.auth_info = encrypted_password(&user_info.name, &password.0);
121 } else {
122 user_info.auth_info = None;
123 notice =
124 Some("empty string is not a valid password, clearing password".to_owned());
125 }
126 update_fields.push(UpdateField::AuthInfo);
127 }
128 UserOption::OAuth(options) => {
129 let auth_info = build_oauth_info(options).ok_or_else(|| {
130 ErrorCode::InvalidParameterValue(format!(
131 "{} and {} must be provided",
132 OAUTH_JWKS_URL_KEY, OAUTH_ISSUER_KEY
133 ))
134 })?;
135 user_info.auth_info = Some(auth_info);
136 update_fields.push(UpdateField::AuthInfo)
137 }
138 }
139 }
140 Ok((user_info, update_fields, notice))
141}
142
143fn alter_rename_prost_user_info(
144 mut user_info: UserInfo,
145 new_name: ObjectName,
146 session_user: &UserCatalog,
147) -> Result<(UserInfo, Vec<UpdateField>)> {
148 if session_user.id == user_info.id {
149 return Err(InternalError("session user cannot be renamed".to_owned()).into());
150 }
151
152 if !session_user.is_super {
153 return Err(PermissionDenied("must be superuser to rename users".to_owned()).into());
154 }
155
156 let new_name = Binder::resolve_user_name(new_name)?;
157 if is_reserved_admin_user(&new_name) {
158 return Err(PermissionDenied(
159 format!("{} is reserved for admin", DEFAULT_SUPER_USER_FOR_ADMIN).to_owned(),
160 )
161 .into());
162 }
163 if is_reserved_admin_user(&user_info.name) {
164 return Err(PermissionDenied(
165 format!("{} cannot be renamed", DEFAULT_SUPER_USER_FOR_ADMIN).to_owned(),
166 )
167 .into());
168 }
169
170 user_info.name = new_name;
171 user_info.auth_info = None;
172 Ok((user_info, vec![UpdateField::Rename, UpdateField::AuthInfo]))
173}
174
175pub async fn handle_alter_user(
176 handler_args: HandlerArgs,
177 stmt: AlterUserStatement,
178) -> Result<RwPgResponse> {
179 let session = handler_args.session;
180 let (user_info, update_fields, notice) = {
181 let user_name = Binder::resolve_user_name(stmt.user_name.clone())?;
182 let user_reader = session.env().user_info_reader().read_guard();
183
184 let old_info = user_reader
185 .get_user_by_name(&user_name)
186 .ok_or(CatalogError::NotFound("user", user_name))?
187 .to_prost();
188
189 let session_user = user_reader
190 .get_user_by_name(&session.user_name())
191 .ok_or_else(|| CatalogError::NotFound("user", session.user_name().to_owned()))?;
192
193 match stmt.mode {
194 risingwave_sqlparser::ast::AlterUserMode::Options(options) => {
195 alter_prost_user_info(old_info, &options, session_user)?
196 }
197 risingwave_sqlparser::ast::AlterUserMode::Rename(new_name) => {
198 let (user_info, fields) =
199 alter_rename_prost_user_info(old_info, new_name, session_user)?;
200 (user_info, fields, None)
201 }
202 }
203 };
204
205 let user_info_writer = session.user_info_writer()?;
206 user_info_writer
207 .update_user(user_info, update_fields)
208 .await?;
209 let response_builder = RwPgResponse::builder(StatementType::UPDATE_USER);
210 if let Some(notice) = notice {
211 Ok(response_builder.notice(notice).into())
212 } else {
213 Ok(response_builder.into())
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use std::collections::HashMap;
220
221 use risingwave_pb::user::AuthInfo;
222 use risingwave_pb::user::auth_info::EncryptionType;
223
224 use crate::test_utils::LocalFrontend;
225
226 #[tokio::test]
227 async fn test_alter_user() {
228 let frontend = LocalFrontend::new(Default::default()).await;
229 let session = frontend.session_ref();
230 let user_info_reader = session.env().user_info_reader();
231
232 frontend.run_sql("CREATE USER userB WITH SUPERUSER NOCREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
233 frontend
234 .run_sql("ALTER USER userB RENAME TO user")
235 .await
236 .unwrap();
237 assert!(
238 user_info_reader
239 .read_guard()
240 .get_user_by_name("userB")
241 .is_none()
242 );
243 assert!(
244 user_info_reader
245 .read_guard()
246 .get_user_by_name("user")
247 .is_some()
248 );
249
250 frontend.run_sql("ALTER USER user WITH NOSUPERUSER CREATEDB PASSWORD 'md59f2fa6a30871a92249bdd2f1eeee4ef6'").await.unwrap();
251
252 let user_info = user_info_reader
253 .read_guard()
254 .get_user_by_name("user")
255 .cloned()
256 .unwrap();
257 assert!(!user_info.is_super);
258 assert!(user_info.can_create_db);
259 assert_eq!(
260 user_info.auth_info,
261 Some(AuthInfo {
262 encryption_type: EncryptionType::Md5 as i32,
263 encrypted_value: b"9f2fa6a30871a92249bdd2f1eeee4ef6".to_vec(),
264 metadata: HashMap::new(),
265 })
266 );
267 }
268}