risingwave_frontend/handler/
create_user.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_pb::user::grant_privilege::{Action, ActionWithGrantOption, Object};
17use risingwave_pb::user::{GrantPrivilege, UserInfo};
18use risingwave_sqlparser::ast::{CreateUserStatement, UserOption, UserOptions};
19
20use super::RwPgResponse;
21use crate::binder::Binder;
22use crate::catalog::{CatalogError, DatabaseId};
23use crate::error::ErrorCode::{self, PermissionDenied};
24use crate::error::Result;
25use crate::handler::HandlerArgs;
26use crate::user::user_authentication::{
27    OAUTH_ISSUER_KEY, OAUTH_JWKS_URL_KEY, build_oauth_info, encrypted_password,
28};
29use crate::user::user_catalog::UserCatalog;
30
31fn make_prost_user_info(
32    user_name: String,
33    options: &UserOptions,
34    session_user: &UserCatalog,
35    database_id: DatabaseId,
36) -> Result<UserInfo> {
37    if !session_user.is_super {
38        let require_super = options
39            .0
40            .iter()
41            .any(|option| matches!(option, UserOption::SuperUser));
42        if require_super {
43            return Err(
44                PermissionDenied("must be superuser to create superusers".to_owned()).into(),
45            );
46        }
47
48        if !session_user.can_create_user {
49            return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
50        }
51    }
52
53    // Since we don't have concept of PUBLIC group yet, here we simply grant new user with CONNECT
54    // action of session database.
55    let grant_privileges = vec![GrantPrivilege {
56        action_with_opts: vec![ActionWithGrantOption {
57            action: Action::Connect as i32,
58            with_grant_option: true,
59            granted_by: session_user.id,
60        }],
61        object: Some(Object::DatabaseId(database_id)),
62    }];
63
64    let mut user_info = UserInfo {
65        name: user_name,
66        // the LOGIN option is implied if it is not explicitly specified.
67        can_login: true,
68        grant_privileges,
69        ..Default::default()
70    };
71
72    for option in &options.0 {
73        match option {
74            UserOption::SuperUser => user_info.is_super = true,
75            UserOption::NoSuperUser => user_info.is_super = false,
76            UserOption::CreateDB => user_info.can_create_db = true,
77            UserOption::NoCreateDB => user_info.can_create_db = false,
78            UserOption::CreateUser => user_info.can_create_user = true,
79            UserOption::NoCreateUser => user_info.can_create_user = false,
80            UserOption::Login => user_info.can_login = true,
81            UserOption::NoLogin => user_info.can_login = false,
82            UserOption::EncryptedPassword(password) => {
83                // TODO: Behaviour of PostgreSQL: Notice when password is empty string.
84                if !password.0.is_empty() {
85                    user_info.auth_info = encrypted_password(&user_info.name, &password.0);
86                }
87            }
88            UserOption::Password(opt) => {
89                // TODO: Behaviour of PostgreSQL: Notice when password is empty string.
90                if let Some(password) = opt
91                    && !password.0.is_empty()
92                {
93                    user_info.auth_info = encrypted_password(&user_info.name, &password.0);
94                }
95            }
96            UserOption::OAuth(options) => {
97                let auth_info = build_oauth_info(options).ok_or_else(|| {
98                    ErrorCode::InvalidParameterValue(format!(
99                        "{} and {} must be provided",
100                        OAUTH_JWKS_URL_KEY, OAUTH_ISSUER_KEY
101                    ))
102                })?;
103                user_info.auth_info = Some(auth_info);
104            }
105        }
106    }
107
108    Ok(user_info)
109}
110
111pub async fn handle_create_user(
112    handler_args: HandlerArgs,
113    stmt: CreateUserStatement,
114) -> Result<RwPgResponse> {
115    let session = handler_args.session;
116    let database_id = {
117        let catalog_reader = session.env().catalog_reader().read_guard();
118        catalog_reader
119            .get_database_by_name(&session.database())
120            .expect("session database should exist")
121            .id()
122    };
123    let user_info = {
124        let user_name = Binder::resolve_user_name(stmt.user_name)?;
125        let user_reader = session.env().user_info_reader().read_guard();
126        if user_reader.get_user_by_name(&user_name).is_some() {
127            return Err(CatalogError::duplicated("user", user_name).into());
128        }
129
130        let session_user = user_reader
131            .get_user_by_name(&session.user_name())
132            .ok_or_else(|| CatalogError::NotFound("user", session.user_name()))?;
133
134        make_prost_user_info(user_name, &stmt.with_options, session_user, database_id)?
135    };
136
137    let user_info_writer = session.user_info_writer()?;
138    user_info_writer.create_user(user_info).await?;
139    Ok(PgResponse::empty_result(StatementType::CREATE_USER))
140}
141
142#[cfg(test)]
143mod tests {
144    use std::collections::HashMap;
145
146    use risingwave_common::catalog::DEFAULT_DATABASE_NAME;
147    use risingwave_pb::user::AuthInfo;
148    use risingwave_pb::user::auth_info::EncryptionType;
149
150    use crate::test_utils::LocalFrontend;
151
152    #[tokio::test]
153    async fn test_create_user() {
154        let frontend = LocalFrontend::new(Default::default()).await;
155        let session = frontend.session_ref();
156        let user_info_reader = session.env().user_info_reader();
157
158        frontend.run_sql("CREATE USER user WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
159
160        let user_info = user_info_reader
161            .read_guard()
162            .get_user_by_name("user")
163            .cloned()
164            .unwrap();
165        assert!(!user_info.is_super);
166        assert!(user_info.can_login);
167        assert!(user_info.can_create_db);
168        assert!(!user_info.can_create_user);
169        assert_eq!(
170            user_info.auth_info,
171            Some(AuthInfo {
172                encryption_type: EncryptionType::Md5 as i32,
173                encrypted_value: b"827ccb0eea8a706c4c34a16891f84e7b".to_vec(),
174                metadata: HashMap::new(),
175            })
176        );
177        frontend
178            .run_sql("CREATE USER usercreator WITH NOSUPERUSER CREATEUSER PASSWORD ''")
179            .await
180            .unwrap();
181        assert!(
182            frontend
183                .run_user_sql(
184                    "CREATE USER fail WITH PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'",
185                    DEFAULT_DATABASE_NAME.to_owned(),
186                    "user".to_owned(),
187                    user_info.id
188                )
189                .await
190                .is_err()
191        );
192
193        assert!(
194            frontend
195                .run_user_sql(
196                    "CREATE USER success WITH NOSUPERUSER PASSWORD ''",
197                    DEFAULT_DATABASE_NAME.to_owned(),
198                    "usercreator".to_owned(),
199                    user_info.id
200                )
201                .await
202                .is_ok()
203        );
204        assert!(
205            frontend
206                .run_user_sql(
207                    "CREATE USER fail2 WITH SUPERUSER PASSWORD ''",
208                    DEFAULT_DATABASE_NAME.to_owned(),
209                    "usercreator".to_owned(),
210                    user_info.id
211                )
212                .await
213                .is_err()
214        );
215    }
216}