risingwave_frontend/handler/
create_user.rs1use pgwire::pg_response::StatementType;
16use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, Object};
17use risingwave_pb::user::{Action, GrantPrivilege, UserInfo};
18use risingwave_sqlparser::ast::{CreateUserStatement, UserOption};
19
20use super::RwPgResponse;
21use crate::binder::Binder;
22use crate::catalog::CatalogError;
23use crate::error::ErrorCode::{self, PermissionDenied};
24use crate::error::Result;
25use crate::handler::HandlerArgs;
26use crate::user::user_authentication::{
27 OAUTH_ISSUER_KEY, OAUTH_JWKS_URL_KEY, build_oauth_info, encrypted_password,
28};
29
30pub async fn handle_create_user(
31 handler_args: HandlerArgs,
32 stmt: CreateUserStatement,
33) -> Result<RwPgResponse> {
34 let session = handler_args.session;
35 let database_id = {
36 let catalog_reader = session.env().catalog_reader().read_guard();
37 catalog_reader
38 .get_database_by_name(&session.database())
39 .expect("session database should exist")
40 .id()
41 };
42
43 let user_name = Binder::resolve_user_name(stmt.user_name)?;
44 let mut user_info = UserInfo {
45 name: user_name.clone(),
46 can_login: true,
48 ..Default::default()
49 };
50 let mut notice = None;
51 {
52 let user_reader = session.env().user_info_reader().read_guard();
53 if user_reader.get_user_by_name(&user_name).is_some() {
54 return Err(CatalogError::duplicated("user", user_name).into());
55 }
56
57 let session_user = user_reader
58 .get_user_by_name(&session.user_name())
59 .ok_or_else(|| CatalogError::NotFound("user", session.user_name()))?;
60
61 if !session_user.is_super {
62 let require_super = stmt
63 .with_options
64 .0
65 .iter()
66 .any(|option| matches!(option, UserOption::SuperUser));
67 if require_super {
68 return Err(
69 PermissionDenied("must be superuser to create superusers".to_owned()).into(),
70 );
71 }
72
73 if !session_user.can_create_user {
74 return Err(PermissionDenied("permission denied to create user".to_owned()).into());
75 }
76 }
77
78 user_info.grant_privileges = vec![GrantPrivilege {
81 action_with_opts: vec![ActionWithGrantOption {
82 action: Action::Connect as i32,
83 with_grant_option: true,
84 granted_by: session_user.id,
85 }],
86 object: Some(Object::DatabaseId(database_id)),
87 }];
88
89 for option in stmt.with_options.0 {
90 match option {
91 UserOption::SuperUser => user_info.is_super = true,
92 UserOption::NoSuperUser => user_info.is_super = false,
93 UserOption::CreateDB => user_info.can_create_db = true,
94 UserOption::NoCreateDB => user_info.can_create_db = false,
95 UserOption::CreateUser => user_info.can_create_user = true,
96 UserOption::NoCreateUser => user_info.can_create_user = false,
97 UserOption::Login => user_info.can_login = true,
98 UserOption::NoLogin => user_info.can_login = false,
99 UserOption::EncryptedPassword(password) => {
100 if !password.0.is_empty() {
101 user_info.auth_info = encrypted_password(&user_info.name, &password.0);
102 } else {
103 notice = Some(
104 "empty string is not a valid password, clearing password".to_owned(),
105 );
106 }
107 }
108 UserOption::Password(opt) => {
109 if let Some(password) = opt
110 && !password.0.is_empty()
111 {
112 user_info.auth_info = encrypted_password(&user_info.name, &password.0);
113 } else {
114 notice = Some(
115 "empty string is not a valid password, clearing password".to_owned(),
116 );
117 }
118 }
119 UserOption::OAuth(options) => {
120 let auth_info = build_oauth_info(&options).ok_or_else(|| {
121 ErrorCode::InvalidParameterValue(format!(
122 "{} and {} must be provided",
123 OAUTH_JWKS_URL_KEY, OAUTH_ISSUER_KEY
124 ))
125 })?;
126 user_info.auth_info = Some(auth_info);
127 }
128 }
129 }
130 };
131
132 let user_info_writer = session.user_info_writer()?;
133 user_info_writer.create_user(user_info).await?;
134 let response_builder = RwPgResponse::builder(StatementType::CREATE_USER);
135 if let Some(notice) = notice {
136 Ok(response_builder.notice(notice).into())
137 } else {
138 Ok(response_builder.into())
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use std::collections::HashMap;
145
146 use risingwave_common::catalog::DEFAULT_DATABASE_NAME;
147 use risingwave_pb::user::AuthInfo;
148 use risingwave_pb::user::auth_info::EncryptionType;
149
150 use crate::test_utils::LocalFrontend;
151
152 #[tokio::test]
153 async fn test_create_user() {
154 let frontend = LocalFrontend::new(Default::default()).await;
155 let session = frontend.session_ref();
156 let user_info_reader = session.env().user_info_reader();
157
158 frontend.run_sql("CREATE USER user WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
159
160 let user_info = user_info_reader
161 .read_guard()
162 .get_user_by_name("user")
163 .cloned()
164 .unwrap();
165 assert!(!user_info.is_super);
166 assert!(user_info.can_login);
167 assert!(user_info.can_create_db);
168 assert!(!user_info.can_create_user);
169 assert_eq!(
170 user_info.auth_info,
171 Some(AuthInfo {
172 encryption_type: EncryptionType::Md5 as i32,
173 encrypted_value: b"827ccb0eea8a706c4c34a16891f84e7b".to_vec(),
174 metadata: HashMap::new(),
175 })
176 );
177 frontend
178 .run_sql("CREATE USER usercreator WITH NOSUPERUSER CREATEUSER PASSWORD ''")
179 .await
180 .unwrap();
181 assert!(
182 frontend
183 .run_user_sql(
184 "CREATE USER fail WITH PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'",
185 DEFAULT_DATABASE_NAME.to_owned(),
186 "user".to_owned(),
187 user_info.id
188 )
189 .await
190 .is_err()
191 );
192
193 assert!(
194 frontend
195 .run_user_sql(
196 "CREATE USER success WITH NOSUPERUSER PASSWORD ''",
197 DEFAULT_DATABASE_NAME.to_owned(),
198 "usercreator".to_owned(),
199 user_info.id
200 )
201 .await
202 .is_ok()
203 );
204 assert!(
205 frontend
206 .run_user_sql(
207 "CREATE USER fail2 WITH SUPERUSER PASSWORD ''",
208 DEFAULT_DATABASE_NAME.to_owned(),
209 "usercreator".to_owned(),
210 user_info.id
211 )
212 .await
213 .is_err()
214 );
215 }
216}