risingwave_frontend/handler/
alter_user.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER_FOR_ADMIN, is_reserved_admin_user};
19use risingwave_pb::user::UserInfo;
20use risingwave_pb::user::update_user_request::UpdateField;
21use risingwave_sqlparser::ast::{AlterUserStatement, ObjectName, UserOption, UserOptions};
22
23use super::RwPgResponse;
24use crate::binder::Binder;
25use crate::catalog::CatalogError;
26use crate::error::ErrorCode::{self, InternalError, PermissionDenied};
27use crate::error::Result;
28use crate::handler::HandlerArgs;
29use crate::user::user_authentication::{
30    OAUTH_ISSUER_KEY, OAUTH_JWKS_URL_KEY, build_oauth_info, encrypted_password,
31};
32use crate::user::user_catalog::UserCatalog;
33
34fn alter_prost_user_info(
35    mut user_info: UserInfo,
36    options: &UserOptions,
37    session_user: &UserCatalog,
38) -> Result<(UserInfo, Vec<UpdateField>, Vec<String>)> {
39    let change_self_password_only = session_user.id == user_info.id
40        && options.0.len() == 1
41        && matches!(
42            &options.0[0],
43            UserOption::EncryptedPassword(_) | UserOption::Password(_) | UserOption::OAuth(_)
44        );
45    let require_admin = user_info.is_admin
46        || options
47            .0
48            .iter()
49            .any(|option| matches!(option, UserOption::Admin | UserOption::NoAdmin));
50    if !change_self_password_only && require_admin && !session_user.is_admin {
51        return Err(PermissionDenied(format!(
52            "{} cannot be altered with admin option",
53            user_info.name
54        ))
55        .into());
56    }
57
58    if !session_user.is_super {
59        let require_super = user_info.is_super
60            || options
61                .0
62                .iter()
63                .any(|option| matches!(option, UserOption::SuperUser | UserOption::NoSuperUser));
64        if require_super {
65            return Err(PermissionDenied(
66                "must be superuser to alter superuser roles or change superuser attribute"
67                    .to_owned(),
68            )
69            .into());
70        }
71
72        if !session_user.can_create_user && !change_self_password_only {
73            return Err(PermissionDenied("permission denied to alter user".to_owned()).into());
74        }
75    }
76
77    let mut update_fields = HashSet::new();
78    let mut notices = vec![];
79
80    for option in &options.0 {
81        match option {
82            UserOption::SuperUser => {
83                user_info.is_super = true;
84                update_fields.insert(UpdateField::Super);
85            }
86            UserOption::NoSuperUser => {
87                user_info.is_super = false;
88                update_fields.insert(UpdateField::Super);
89            }
90            UserOption::CreateDB => {
91                user_info.can_create_db = true;
92                update_fields.insert(UpdateField::CreateDb);
93            }
94            UserOption::NoCreateDB => {
95                user_info.can_create_db = false;
96                update_fields.insert(UpdateField::CreateDb);
97            }
98            UserOption::CreateUser => {
99                user_info.can_create_user = true;
100                update_fields.insert(UpdateField::CreateUser);
101            }
102            UserOption::NoCreateUser => {
103                user_info.can_create_user = false;
104                update_fields.insert(UpdateField::CreateUser);
105            }
106            UserOption::Login => {
107                user_info.can_login = true;
108                update_fields.insert(UpdateField::Login);
109            }
110            UserOption::NoLogin => {
111                user_info.can_login = false;
112                update_fields.insert(UpdateField::Login);
113            }
114            UserOption::Admin => {
115                user_info.is_admin = true;
116                update_fields.insert(UpdateField::Admin);
117            }
118            UserOption::NoAdmin => {
119                user_info.is_admin = false;
120                update_fields.insert(UpdateField::Admin);
121            }
122            UserOption::EncryptedPassword(p) => {
123                if !p.0.is_empty() {
124                    user_info.auth_info = encrypted_password(&user_info.name, &p.0);
125                } else {
126                    user_info.auth_info = None;
127                    notices
128                        .push("empty string is not a valid password, clearing password".to_owned());
129                };
130                update_fields.insert(UpdateField::AuthInfo);
131            }
132            UserOption::Password(opt) => {
133                if let Some(password) = opt
134                    && !password.0.is_empty()
135                {
136                    user_info.auth_info = encrypted_password(&user_info.name, &password.0);
137                } else {
138                    user_info.auth_info = None;
139                    notices
140                        .push("empty string is not a valid password, clearing password".to_owned());
141                }
142                update_fields.insert(UpdateField::AuthInfo);
143            }
144            UserOption::OAuth(options) => {
145                let auth_info = build_oauth_info(options).ok_or_else(|| {
146                    ErrorCode::InvalidParameterValue(format!(
147                        "{} and {} must be provided",
148                        OAUTH_JWKS_URL_KEY, OAUTH_ISSUER_KEY
149                    ))
150                })?;
151                user_info.auth_info = Some(auth_info);
152                update_fields.insert(UpdateField::AuthInfo);
153            }
154        }
155    }
156
157    if user_info.is_admin && !user_info.is_super {
158        user_info.is_super = true;
159        update_fields.insert(UpdateField::Super);
160        if options
161            .0
162            .iter()
163            .any(|option| matches!(option, UserOption::NoSuperUser))
164        {
165            notices.push("admin users must remain superusers, ignoring NOSUPERUSER".to_owned());
166        }
167    }
168
169    Ok((user_info, update_fields.into_iter().collect(), notices))
170}
171
172fn alter_rename_prost_user_info(
173    mut user_info: UserInfo,
174    new_name: ObjectName,
175    session_user: &UserCatalog,
176) -> Result<(UserInfo, Vec<UpdateField>)> {
177    if session_user.id == user_info.id {
178        return Err(InternalError("session user cannot be renamed".to_owned()).into());
179    }
180
181    if !session_user.is_super {
182        return Err(PermissionDenied("must be superuser to rename users".to_owned()).into());
183    }
184
185    let new_name = Binder::resolve_user_name(new_name)?;
186    if is_reserved_admin_user(&new_name) {
187        return Err(PermissionDenied(format!(
188            "{} is reserved for admin",
189            DEFAULT_SUPER_USER_FOR_ADMIN
190        ))
191        .into());
192    }
193    if is_reserved_admin_user(&user_info.name) {
194        return Err(PermissionDenied(format!(
195            "{} cannot be renamed",
196            DEFAULT_SUPER_USER_FOR_ADMIN
197        ))
198        .into());
199    }
200
201    user_info.name = new_name;
202    user_info.auth_info = None;
203    Ok((user_info, vec![UpdateField::Rename, UpdateField::AuthInfo]))
204}
205
206pub async fn handle_alter_user(
207    handler_args: HandlerArgs,
208    stmt: AlterUserStatement,
209) -> Result<RwPgResponse> {
210    let session = handler_args.session;
211    let (user_info, update_fields, notices) = {
212        let user_name = Binder::resolve_user_name(stmt.user_name.clone())?;
213        let user_reader = session.env().user_info_reader().read_guard();
214
215        let old_info = user_reader
216            .get_user_by_name(&user_name)
217            .ok_or(CatalogError::NotFound("user", user_name))?
218            .to_prost();
219
220        let session_user = user_reader
221            .get_user_by_name(&session.user_name())
222            .ok_or_else(|| CatalogError::NotFound("user", session.user_name()))?;
223
224        match stmt.mode {
225            risingwave_sqlparser::ast::AlterUserMode::Options(options) => {
226                alter_prost_user_info(old_info, &options, session_user)?
227            }
228            risingwave_sqlparser::ast::AlterUserMode::Rename(new_name) => {
229                let (user_info, fields) =
230                    alter_rename_prost_user_info(old_info, new_name, session_user)?;
231                (user_info, fields, vec![])
232            }
233        }
234    };
235
236    let user_info_writer = session.user_info_writer()?;
237    user_info_writer
238        .update_user(user_info, update_fields)
239        .await?;
240    let mut response_builder = RwPgResponse::builder(StatementType::UPDATE_USER);
241    for notice in notices {
242        response_builder = response_builder.notice(notice);
243    }
244    Ok(response_builder.into())
245}
246
247#[cfg(test)]
248mod tests {
249    use std::collections::HashMap;
250
251    use risingwave_common::catalog::{
252        DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID,
253    };
254    use risingwave_pb::user::AuthInfo;
255    use risingwave_pb::user::auth_info::EncryptionType;
256
257    use crate::test_utils::LocalFrontend;
258
259    #[tokio::test]
260    async fn test_alter_user() {
261        let frontend = LocalFrontend::new(Default::default()).await;
262        let session = frontend.session_ref();
263        let user_info_reader = session.env().user_info_reader();
264
265        frontend.run_sql("CREATE USER userB WITH SUPERUSER NOCREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
266        frontend
267            .run_sql("ALTER USER userB RENAME TO user")
268            .await
269            .unwrap();
270        assert!(
271            user_info_reader
272                .read_guard()
273                .get_user_by_name("userB")
274                .is_none()
275        );
276        assert!(
277            user_info_reader
278                .read_guard()
279                .get_user_by_name("user")
280                .is_some()
281        );
282
283        frontend.run_sql("ALTER USER user WITH NOSUPERUSER CREATEDB PASSWORD 'md59f2fa6a30871a92249bdd2f1eeee4ef6'").await.unwrap();
284
285        let user_info = user_info_reader
286            .read_guard()
287            .get_user_by_name("user")
288            .cloned()
289            .unwrap();
290        assert!(!user_info.is_super);
291        assert!(user_info.can_create_db);
292        assert_eq!(
293            user_info.auth_info,
294            Some(AuthInfo {
295                encryption_type: EncryptionType::Md5 as i32,
296                encrypted_value: b"9f2fa6a30871a92249bdd2f1eeee4ef6".to_vec(),
297                metadata: HashMap::new(),
298            })
299        );
300    }
301
302    #[tokio::test]
303    async fn test_alter_admin_user() {
304        let frontend = LocalFrontend::new(Default::default()).await;
305        let session = frontend.session_ref();
306        let user_info_reader = session.env().user_info_reader();
307
308        // Create admin user
309        frontend
310            .run_user_sql(
311                "CREATE USER admin_user WITH ADMIN",
312                DEFAULT_DATABASE_NAME.to_owned(),
313                DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
314                DEFAULT_SUPER_USER_FOR_ADMIN_ID,
315            )
316            .await
317            .unwrap();
318
319        let admin_user = user_info_reader
320            .read_guard()
321            .get_user_by_name("admin_user")
322            .cloned()
323            .unwrap();
324
325        // Should be both admin and superuser
326        assert!(admin_user.is_admin);
327        assert!(admin_user.is_super);
328
329        // Try to remove superuser status from admin user - should be ignored with notice
330        frontend
331            .run_user_sql(
332                "ALTER USER admin_user WITH NOSUPERUSER",
333                DEFAULT_DATABASE_NAME.to_owned(),
334                DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
335                DEFAULT_SUPER_USER_FOR_ADMIN_ID,
336            )
337            .await
338            .unwrap();
339
340        let admin_user_after = user_info_reader
341            .read_guard()
342            .get_user_by_name("admin_user")
343            .cloned()
344            .unwrap();
345
346        assert!(admin_user_after.is_admin);
347        assert!(admin_user_after.is_super);
348
349        // Now remove admin and super status from admin user
350        frontend
351            .run_user_sql(
352                "ALTER USER admin_user WITH NOADMIN NOSUPERUSER",
353                DEFAULT_DATABASE_NAME.to_owned(),
354                DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
355                DEFAULT_SUPER_USER_FOR_ADMIN_ID,
356            )
357            .await
358            .unwrap();
359
360        let admin_user_after = user_info_reader
361            .read_guard()
362            .get_user_by_name("admin_user")
363            .cloned()
364            .unwrap();
365
366        assert!(!admin_user_after.is_admin);
367        assert!(!admin_user_after.is_super);
368    }
369}