risingwave_frontend/handler/
create_user.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, Object};
17use risingwave_pb::user::{Action, GrantPrivilege, UserInfo};
18use risingwave_sqlparser::ast::{CreateUserStatement, UserOption};
19
20use super::RwPgResponse;
21use crate::binder::Binder;
22use crate::catalog::CatalogError;
23use crate::error::ErrorCode::{self, PermissionDenied};
24use crate::error::Result;
25use crate::handler::HandlerArgs;
26use crate::user::user_authentication::{
27    OAUTH_ISSUER_KEY, OAUTH_JWKS_URL_KEY, build_oauth_info, encrypted_password,
28};
29
30pub async fn handle_create_user(
31    handler_args: HandlerArgs,
32    stmt: CreateUserStatement,
33) -> Result<RwPgResponse> {
34    let session = handler_args.session;
35    let database_id = {
36        let catalog_reader = session.env().catalog_reader().read_guard();
37        catalog_reader
38            .get_database_by_name(&session.database())
39            .expect("session database should exist")
40            .id()
41    };
42
43    let user_name = Binder::resolve_user_name(stmt.user_name)?;
44    let mut user_info = UserInfo {
45        name: user_name.clone(),
46        // the LOGIN option is implied if it is not explicitly specified.
47        can_login: true,
48        ..Default::default()
49    };
50    let mut notices = vec![];
51    {
52        let user_reader = session.env().user_info_reader().read_guard();
53        if user_reader.get_user_by_name(&user_name).is_some() {
54            return Err(CatalogError::duplicated("user", user_name).into());
55        }
56
57        let session_user = user_reader
58            .get_user_by_name(&session.user_name())
59            .ok_or_else(|| CatalogError::NotFound("user", session.user_name()))?;
60
61        if !session_user.is_super {
62            let require_super = stmt
63                .with_options
64                .0
65                .iter()
66                .any(|option| matches!(option, UserOption::SuperUser));
67            if require_super {
68                return Err(
69                    PermissionDenied("must be superuser to create superusers".to_owned()).into(),
70                );
71            }
72
73            if !session_user.can_create_user {
74                return Err(PermissionDenied("permission denied to create user".to_owned()).into());
75            }
76        }
77
78        // Admin users can only be created by other admin users
79        let require_admin = stmt
80            .with_options
81            .0
82            .iter()
83            .any(|option| matches!(option, UserOption::Admin));
84        if require_admin && !session_user.is_admin {
85            return Err(
86                PermissionDenied("only admin users can create admin users".to_owned()).into(),
87            );
88        }
89
90        // Since we don't have a concept of PUBLIC group yet, here we simply grant new user with CONNECT
91        // action for the session database.
92        user_info.grant_privileges = vec![GrantPrivilege {
93            action_with_opts: vec![ActionWithGrantOption {
94                action: Action::Connect as i32,
95                with_grant_option: true,
96                granted_by: session_user.id,
97            }],
98            object: Some(Object::DatabaseId(database_id)),
99        }];
100
101        for option in stmt.with_options.0 {
102            match option {
103                UserOption::SuperUser => user_info.is_super = true,
104                UserOption::NoSuperUser => user_info.is_super = false,
105                UserOption::CreateDB => user_info.can_create_db = true,
106                UserOption::NoCreateDB => user_info.can_create_db = false,
107                UserOption::CreateUser => user_info.can_create_user = true,
108                UserOption::NoCreateUser => user_info.can_create_user = false,
109                UserOption::Login => user_info.can_login = true,
110                UserOption::NoLogin => user_info.can_login = false,
111                UserOption::Admin => user_info.is_admin = true,
112                UserOption::NoAdmin => user_info.is_admin = false,
113                UserOption::EncryptedPassword(password) => {
114                    if !password.0.is_empty() {
115                        user_info.auth_info = encrypted_password(&user_info.name, &password.0);
116                    } else {
117                        notices.push(
118                            "empty string is not a valid password, clearing password".to_owned(),
119                        );
120                    }
121                }
122                UserOption::Password(opt) => {
123                    if let Some(password) = opt
124                        && !password.0.is_empty()
125                    {
126                        user_info.auth_info = encrypted_password(&user_info.name, &password.0);
127                    } else {
128                        notices.push(
129                            "empty string is not a valid password, clearing password".to_owned(),
130                        );
131                    }
132                }
133                UserOption::OAuth(options) => {
134                    let auth_info = build_oauth_info(&options).ok_or_else(|| {
135                        ErrorCode::InvalidParameterValue(format!(
136                            "{} and {} must be provided",
137                            OAUTH_JWKS_URL_KEY, OAUTH_ISSUER_KEY
138                        ))
139                    })?;
140                    user_info.auth_info = Some(auth_info);
141                }
142            }
143        }
144
145        if user_info.is_admin && !user_info.is_super {
146            // Admin users are always superusers
147            user_info.is_super = true;
148            notices.push("Admin users are always superusers".to_owned());
149        }
150    };
151
152    let user_info_writer = session.user_info_writer()?;
153    user_info_writer.create_user(user_info).await?;
154    let mut response_builder = RwPgResponse::builder(StatementType::CREATE_USER);
155    for notice in notices {
156        response_builder = response_builder.notice(notice);
157    }
158    Ok(response_builder.into())
159}
160
161#[cfg(test)]
162mod tests {
163    use std::collections::HashMap;
164
165    use risingwave_common::catalog::{
166        DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID,
167    };
168    use risingwave_pb::user::AuthInfo;
169    use risingwave_pb::user::auth_info::EncryptionType;
170
171    use crate::test_utils::LocalFrontend;
172
173    #[tokio::test]
174    async fn test_create_user() {
175        let frontend = LocalFrontend::new(Default::default()).await;
176        let session = frontend.session_ref();
177        let user_info_reader = session.env().user_info_reader();
178
179        frontend.run_sql("CREATE USER user WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
180
181        let user_info = user_info_reader
182            .read_guard()
183            .get_user_by_name("user")
184            .cloned()
185            .unwrap();
186        assert!(!user_info.is_super);
187        assert!(user_info.can_login);
188        assert!(user_info.can_create_db);
189        assert!(!user_info.can_create_user);
190        assert_eq!(
191            user_info.auth_info,
192            Some(AuthInfo {
193                encryption_type: EncryptionType::Md5 as i32,
194                encrypted_value: b"827ccb0eea8a706c4c34a16891f84e7b".to_vec(),
195                metadata: HashMap::new(),
196            })
197        );
198        frontend
199            .run_sql("CREATE USER usercreator WITH NOSUPERUSER CREATEUSER PASSWORD ''")
200            .await
201            .unwrap();
202        assert!(
203            frontend
204                .run_user_sql(
205                    "CREATE USER fail WITH PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'",
206                    DEFAULT_DATABASE_NAME.to_owned(),
207                    "user".to_owned(),
208                    user_info.id
209                )
210                .await
211                .is_err()
212        );
213
214        assert!(
215            frontend
216                .run_user_sql(
217                    "CREATE USER success WITH NOSUPERUSER PASSWORD ''",
218                    DEFAULT_DATABASE_NAME.to_owned(),
219                    "usercreator".to_owned(),
220                    user_info.id
221                )
222                .await
223                .is_ok()
224        );
225        assert!(
226            frontend
227                .run_user_sql(
228                    "CREATE USER fail2 WITH SUPERUSER PASSWORD ''",
229                    DEFAULT_DATABASE_NAME.to_owned(),
230                    "usercreator".to_owned(),
231                    user_info.id
232                )
233                .await
234                .is_err()
235        );
236    }
237
238    #[tokio::test]
239    async fn test_create_admin_user() {
240        let frontend = LocalFrontend::new(Default::default()).await;
241        let session = frontend.session_ref();
242        let user_info_reader = session.env().user_info_reader();
243
244        frontend
245            .run_sql("CREATE USER no_admin_user WITH NOADMIN")
246            .await
247            .unwrap();
248
249        let no_admin_user = user_info_reader
250            .read_guard()
251            .get_user_by_name("no_admin_user")
252            .cloned()
253            .unwrap();
254        assert!(!no_admin_user.is_admin);
255
256        assert!(
257            frontend
258                .run_sql("CREATE USER admin_user WITH ADMIN")
259                .await
260                .is_err(),
261            "admin users can only be created by other admin users"
262        );
263
264        frontend
265            .run_user_sql(
266                "CREATE USER admin_user WITH NOSUPERUSER CREATEUSER ADMIN",
267                DEFAULT_DATABASE_NAME.to_owned(),
268                DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
269                DEFAULT_SUPER_USER_FOR_ADMIN_ID,
270            )
271            .await
272            .unwrap();
273        let admin_user = user_info_reader
274            .read_guard()
275            .get_user_by_name("admin_user")
276            .cloned()
277            .unwrap();
278        assert!(admin_user.is_admin);
279        assert!(admin_user.is_super);
280    }
281}