risingwave_frontend/handler/
create_user.rs1use pgwire::pg_response::StatementType;
16use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, Object};
17use risingwave_pb::user::{Action, GrantPrivilege, UserInfo};
18use risingwave_sqlparser::ast::{CreateUserStatement, UserOption};
19
20use super::RwPgResponse;
21use crate::binder::Binder;
22use crate::catalog::CatalogError;
23use crate::error::ErrorCode::{self, PermissionDenied};
24use crate::error::Result;
25use crate::handler::HandlerArgs;
26use crate::user::user_authentication::{
27 OAUTH_ISSUER_KEY, OAUTH_JWKS_URL_KEY, build_oauth_info, encrypted_password,
28};
29
30pub async fn handle_create_user(
31 handler_args: HandlerArgs,
32 stmt: CreateUserStatement,
33) -> Result<RwPgResponse> {
34 let session = handler_args.session;
35 let database_id = {
36 let catalog_reader = session.env().catalog_reader().read_guard();
37 catalog_reader
38 .get_database_by_name(&session.database())
39 .expect("session database should exist")
40 .id()
41 };
42
43 let user_name = Binder::resolve_user_name(stmt.user_name)?;
44 let mut user_info = UserInfo {
45 name: user_name.clone(),
46 can_login: true,
48 ..Default::default()
49 };
50 let mut notices = vec![];
51 {
52 let user_reader = session.env().user_info_reader().read_guard();
53 if user_reader.get_user_by_name(&user_name).is_some() {
54 return Err(CatalogError::duplicated("user", user_name).into());
55 }
56
57 let session_user = user_reader
58 .get_user_by_name(&session.user_name())
59 .ok_or_else(|| CatalogError::NotFound("user", session.user_name()))?;
60
61 if !session_user.is_super {
62 let require_super = stmt
63 .with_options
64 .0
65 .iter()
66 .any(|option| matches!(option, UserOption::SuperUser));
67 if require_super {
68 return Err(
69 PermissionDenied("must be superuser to create superusers".to_owned()).into(),
70 );
71 }
72
73 if !session_user.can_create_user {
74 return Err(PermissionDenied("permission denied to create user".to_owned()).into());
75 }
76 }
77
78 let require_admin = stmt
80 .with_options
81 .0
82 .iter()
83 .any(|option| matches!(option, UserOption::Admin));
84 if require_admin && !session_user.is_admin {
85 return Err(
86 PermissionDenied("only admin users can create admin users".to_owned()).into(),
87 );
88 }
89
90 user_info.grant_privileges = vec![GrantPrivilege {
93 action_with_opts: vec![ActionWithGrantOption {
94 action: Action::Connect as i32,
95 with_grant_option: true,
96 granted_by: session_user.id,
97 }],
98 object: Some(Object::DatabaseId(database_id)),
99 }];
100
101 for option in stmt.with_options.0 {
102 match option {
103 UserOption::SuperUser => user_info.is_super = true,
104 UserOption::NoSuperUser => user_info.is_super = false,
105 UserOption::CreateDB => user_info.can_create_db = true,
106 UserOption::NoCreateDB => user_info.can_create_db = false,
107 UserOption::CreateUser => user_info.can_create_user = true,
108 UserOption::NoCreateUser => user_info.can_create_user = false,
109 UserOption::Login => user_info.can_login = true,
110 UserOption::NoLogin => user_info.can_login = false,
111 UserOption::Admin => user_info.is_admin = true,
112 UserOption::NoAdmin => user_info.is_admin = false,
113 UserOption::EncryptedPassword(password) => {
114 if !password.0.is_empty() {
115 user_info.auth_info = encrypted_password(&user_info.name, &password.0);
116 } else {
117 notices.push(
118 "empty string is not a valid password, clearing password".to_owned(),
119 );
120 }
121 }
122 UserOption::Password(opt) => {
123 if let Some(password) = opt
124 && !password.0.is_empty()
125 {
126 user_info.auth_info = encrypted_password(&user_info.name, &password.0);
127 } else {
128 notices.push(
129 "empty string is not a valid password, clearing password".to_owned(),
130 );
131 }
132 }
133 UserOption::OAuth(options) => {
134 let auth_info = build_oauth_info(&options).ok_or_else(|| {
135 ErrorCode::InvalidParameterValue(format!(
136 "{} and {} must be provided",
137 OAUTH_JWKS_URL_KEY, OAUTH_ISSUER_KEY
138 ))
139 })?;
140 user_info.auth_info = Some(auth_info);
141 }
142 }
143 }
144
145 if user_info.is_admin && !user_info.is_super {
146 user_info.is_super = true;
148 notices.push("Admin users are always superusers".to_owned());
149 }
150 };
151
152 let user_info_writer = session.user_info_writer()?;
153 user_info_writer.create_user(user_info).await?;
154 let mut response_builder = RwPgResponse::builder(StatementType::CREATE_USER);
155 for notice in notices {
156 response_builder = response_builder.notice(notice);
157 }
158 Ok(response_builder.into())
159}
160
161#[cfg(test)]
162mod tests {
163 use std::collections::HashMap;
164
165 use risingwave_common::catalog::{
166 DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID,
167 };
168 use risingwave_pb::user::AuthInfo;
169 use risingwave_pb::user::auth_info::EncryptionType;
170
171 use crate::test_utils::LocalFrontend;
172
173 #[tokio::test]
174 async fn test_create_user() {
175 let frontend = LocalFrontend::new(Default::default()).await;
176 let session = frontend.session_ref();
177 let user_info_reader = session.env().user_info_reader();
178
179 frontend.run_sql("CREATE USER user WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
180
181 let user_info = user_info_reader
182 .read_guard()
183 .get_user_by_name("user")
184 .cloned()
185 .unwrap();
186 assert!(!user_info.is_super);
187 assert!(user_info.can_login);
188 assert!(user_info.can_create_db);
189 assert!(!user_info.can_create_user);
190 assert_eq!(
191 user_info.auth_info,
192 Some(AuthInfo {
193 encryption_type: EncryptionType::Md5 as i32,
194 encrypted_value: b"827ccb0eea8a706c4c34a16891f84e7b".to_vec(),
195 metadata: HashMap::new(),
196 })
197 );
198 frontend
199 .run_sql("CREATE USER usercreator WITH NOSUPERUSER CREATEUSER PASSWORD ''")
200 .await
201 .unwrap();
202 assert!(
203 frontend
204 .run_user_sql(
205 "CREATE USER fail WITH PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'",
206 DEFAULT_DATABASE_NAME.to_owned(),
207 "user".to_owned(),
208 user_info.id
209 )
210 .await
211 .is_err()
212 );
213
214 assert!(
215 frontend
216 .run_user_sql(
217 "CREATE USER success WITH NOSUPERUSER PASSWORD ''",
218 DEFAULT_DATABASE_NAME.to_owned(),
219 "usercreator".to_owned(),
220 user_info.id
221 )
222 .await
223 .is_ok()
224 );
225 assert!(
226 frontend
227 .run_user_sql(
228 "CREATE USER fail2 WITH SUPERUSER PASSWORD ''",
229 DEFAULT_DATABASE_NAME.to_owned(),
230 "usercreator".to_owned(),
231 user_info.id
232 )
233 .await
234 .is_err()
235 );
236 }
237
238 #[tokio::test]
239 async fn test_create_admin_user() {
240 let frontend = LocalFrontend::new(Default::default()).await;
241 let session = frontend.session_ref();
242 let user_info_reader = session.env().user_info_reader();
243
244 frontend
245 .run_sql("CREATE USER no_admin_user WITH NOADMIN")
246 .await
247 .unwrap();
248
249 let no_admin_user = user_info_reader
250 .read_guard()
251 .get_user_by_name("no_admin_user")
252 .cloned()
253 .unwrap();
254 assert!(!no_admin_user.is_admin);
255
256 assert!(
257 frontend
258 .run_sql("CREATE USER admin_user WITH ADMIN")
259 .await
260 .is_err(),
261 "admin users can only be created by other admin users"
262 );
263
264 frontend
265 .run_user_sql(
266 "CREATE USER admin_user WITH NOSUPERUSER CREATEUSER ADMIN",
267 DEFAULT_DATABASE_NAME.to_owned(),
268 DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
269 DEFAULT_SUPER_USER_FOR_ADMIN_ID,
270 )
271 .await
272 .unwrap();
273 let admin_user = user_info_reader
274 .read_guard()
275 .get_user_by_name("admin_user")
276 .cloned()
277 .unwrap();
278 assert!(admin_user.is_admin);
279 assert!(admin_user.is_super);
280 }
281}