risingwave_frontend/handler/
create_user.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, Object};
17use risingwave_pb::user::{Action, GrantPrivilege, UserInfo};
18use risingwave_sqlparser::ast::{CreateUserStatement, UserOption};
19
20use super::RwPgResponse;
21use crate::binder::Binder;
22use crate::catalog::CatalogError;
23use crate::error::ErrorCode::{self, PermissionDenied};
24use crate::error::Result;
25use crate::handler::HandlerArgs;
26use crate::user::user_authentication::{
27    OAUTH_ISSUER_KEY, OAUTH_JWKS_URL_KEY, build_oauth_info, encrypted_password,
28};
29
30pub async fn handle_create_user(
31    handler_args: HandlerArgs,
32    stmt: CreateUserStatement,
33) -> Result<RwPgResponse> {
34    let session = handler_args.session;
35    let database_id = {
36        let catalog_reader = session.env().catalog_reader().read_guard();
37        catalog_reader
38            .get_database_by_name(&session.database())
39            .expect("session database should exist")
40            .id()
41    };
42
43    let user_name = Binder::resolve_user_name(stmt.user_name)?;
44    let mut user_info = UserInfo {
45        name: user_name.clone(),
46        // the LOGIN option is implied if it is not explicitly specified.
47        can_login: true,
48        ..Default::default()
49    };
50    let mut notice = None;
51    {
52        let user_reader = session.env().user_info_reader().read_guard();
53        if user_reader.get_user_by_name(&user_name).is_some() {
54            return Err(CatalogError::duplicated("user", user_name).into());
55        }
56
57        let session_user = user_reader
58            .get_user_by_name(&session.user_name())
59            .ok_or_else(|| CatalogError::NotFound("user", session.user_name()))?;
60
61        if !session_user.is_super {
62            let require_super = stmt
63                .with_options
64                .0
65                .iter()
66                .any(|option| matches!(option, UserOption::SuperUser));
67            if require_super {
68                return Err(
69                    PermissionDenied("must be superuser to create superusers".to_owned()).into(),
70                );
71            }
72
73            if !session_user.can_create_user {
74                return Err(PermissionDenied("permission denied to create user".to_owned()).into());
75            }
76        }
77
78        // Since we don't have a concept of PUBLIC group yet, here we simply grant new user with CONNECT
79        // action for the session database.
80        user_info.grant_privileges = vec![GrantPrivilege {
81            action_with_opts: vec![ActionWithGrantOption {
82                action: Action::Connect as i32,
83                with_grant_option: true,
84                granted_by: session_user.id,
85            }],
86            object: Some(Object::DatabaseId(database_id)),
87        }];
88
89        for option in stmt.with_options.0 {
90            match option {
91                UserOption::SuperUser => user_info.is_super = true,
92                UserOption::NoSuperUser => user_info.is_super = false,
93                UserOption::CreateDB => user_info.can_create_db = true,
94                UserOption::NoCreateDB => user_info.can_create_db = false,
95                UserOption::CreateUser => user_info.can_create_user = true,
96                UserOption::NoCreateUser => user_info.can_create_user = false,
97                UserOption::Login => user_info.can_login = true,
98                UserOption::NoLogin => user_info.can_login = false,
99                UserOption::EncryptedPassword(password) => {
100                    if !password.0.is_empty() {
101                        user_info.auth_info = encrypted_password(&user_info.name, &password.0);
102                    } else {
103                        notice = Some(
104                            "empty string is not a valid password, clearing password".to_owned(),
105                        );
106                    }
107                }
108                UserOption::Password(opt) => {
109                    if let Some(password) = opt
110                        && !password.0.is_empty()
111                    {
112                        user_info.auth_info = encrypted_password(&user_info.name, &password.0);
113                    } else {
114                        notice = Some(
115                            "empty string is not a valid password, clearing password".to_owned(),
116                        );
117                    }
118                }
119                UserOption::OAuth(options) => {
120                    let auth_info = build_oauth_info(&options).ok_or_else(|| {
121                        ErrorCode::InvalidParameterValue(format!(
122                            "{} and {} must be provided",
123                            OAUTH_JWKS_URL_KEY, OAUTH_ISSUER_KEY
124                        ))
125                    })?;
126                    user_info.auth_info = Some(auth_info);
127                }
128            }
129        }
130    };
131
132    let user_info_writer = session.user_info_writer()?;
133    user_info_writer.create_user(user_info).await?;
134    let response_builder = RwPgResponse::builder(StatementType::CREATE_USER);
135    if let Some(notice) = notice {
136        Ok(response_builder.notice(notice).into())
137    } else {
138        Ok(response_builder.into())
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use std::collections::HashMap;
145
146    use risingwave_common::catalog::DEFAULT_DATABASE_NAME;
147    use risingwave_pb::user::AuthInfo;
148    use risingwave_pb::user::auth_info::EncryptionType;
149
150    use crate::test_utils::LocalFrontend;
151
152    #[tokio::test]
153    async fn test_create_user() {
154        let frontend = LocalFrontend::new(Default::default()).await;
155        let session = frontend.session_ref();
156        let user_info_reader = session.env().user_info_reader();
157
158        frontend.run_sql("CREATE USER user WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
159
160        let user_info = user_info_reader
161            .read_guard()
162            .get_user_by_name("user")
163            .cloned()
164            .unwrap();
165        assert!(!user_info.is_super);
166        assert!(user_info.can_login);
167        assert!(user_info.can_create_db);
168        assert!(!user_info.can_create_user);
169        assert_eq!(
170            user_info.auth_info,
171            Some(AuthInfo {
172                encryption_type: EncryptionType::Md5 as i32,
173                encrypted_value: b"827ccb0eea8a706c4c34a16891f84e7b".to_vec(),
174                metadata: HashMap::new(),
175            })
176        );
177        frontend
178            .run_sql("CREATE USER usercreator WITH NOSUPERUSER CREATEUSER PASSWORD ''")
179            .await
180            .unwrap();
181        assert!(
182            frontend
183                .run_user_sql(
184                    "CREATE USER fail WITH PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'",
185                    DEFAULT_DATABASE_NAME.to_owned(),
186                    "user".to_owned(),
187                    user_info.id
188                )
189                .await
190                .is_err()
191        );
192
193        assert!(
194            frontend
195                .run_user_sql(
196                    "CREATE USER success WITH NOSUPERUSER PASSWORD ''",
197                    DEFAULT_DATABASE_NAME.to_owned(),
198                    "usercreator".to_owned(),
199                    user_info.id
200                )
201                .await
202                .is_ok()
203        );
204        assert!(
205            frontend
206                .run_user_sql(
207                    "CREATE USER fail2 WITH SUPERUSER PASSWORD ''",
208                    DEFAULT_DATABASE_NAME.to_owned(),
209                    "usercreator".to_owned(),
210                    user_info.id
211                )
212                .await
213                .is_err()
214        );
215    }
216}