risingwave_frontend/handler/
create_user.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_pb::user::grant_privilege::{Action, ActionWithGrantOption, Object};
17use risingwave_pb::user::{GrantPrivilege, UserInfo};
18use risingwave_sqlparser::ast::{CreateUserStatement, UserOption, UserOptions};
19
20use super::RwPgResponse;
21use crate::binder::Binder;
22use crate::catalog::{CatalogError, DatabaseId};
23use crate::error::ErrorCode::{self, PermissionDenied};
24use crate::error::Result;
25use crate::handler::HandlerArgs;
26use crate::user::user_authentication::{
27 OAUTH_ISSUER_KEY, OAUTH_JWKS_URL_KEY, build_oauth_info, encrypted_password,
28};
29use crate::user::user_catalog::UserCatalog;
30
31fn make_prost_user_info(
32 user_name: String,
33 options: &UserOptions,
34 session_user: &UserCatalog,
35 database_id: DatabaseId,
36) -> Result<UserInfo> {
37 if !session_user.is_super {
38 let require_super = options
39 .0
40 .iter()
41 .any(|option| matches!(option, UserOption::SuperUser));
42 if require_super {
43 return Err(
44 PermissionDenied("must be superuser to create superusers".to_owned()).into(),
45 );
46 }
47
48 if !session_user.can_create_user {
49 return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
50 }
51 }
52
53 let grant_privileges = vec![GrantPrivilege {
56 action_with_opts: vec![ActionWithGrantOption {
57 action: Action::Connect as i32,
58 with_grant_option: true,
59 granted_by: session_user.id,
60 }],
61 object: Some(Object::DatabaseId(database_id)),
62 }];
63
64 let mut user_info = UserInfo {
65 name: user_name,
66 can_login: true,
68 grant_privileges,
69 ..Default::default()
70 };
71
72 for option in &options.0 {
73 match option {
74 UserOption::SuperUser => user_info.is_super = true,
75 UserOption::NoSuperUser => user_info.is_super = false,
76 UserOption::CreateDB => user_info.can_create_db = true,
77 UserOption::NoCreateDB => user_info.can_create_db = false,
78 UserOption::CreateUser => user_info.can_create_user = true,
79 UserOption::NoCreateUser => user_info.can_create_user = false,
80 UserOption::Login => user_info.can_login = true,
81 UserOption::NoLogin => user_info.can_login = false,
82 UserOption::EncryptedPassword(password) => {
83 if !password.0.is_empty() {
85 user_info.auth_info = encrypted_password(&user_info.name, &password.0);
86 }
87 }
88 UserOption::Password(opt) => {
89 if let Some(password) = opt
91 && !password.0.is_empty()
92 {
93 user_info.auth_info = encrypted_password(&user_info.name, &password.0);
94 }
95 }
96 UserOption::OAuth(options) => {
97 let auth_info = build_oauth_info(options).ok_or_else(|| {
98 ErrorCode::InvalidParameterValue(format!(
99 "{} and {} must be provided",
100 OAUTH_JWKS_URL_KEY, OAUTH_ISSUER_KEY
101 ))
102 })?;
103 user_info.auth_info = Some(auth_info);
104 }
105 }
106 }
107
108 Ok(user_info)
109}
110
111pub async fn handle_create_user(
112 handler_args: HandlerArgs,
113 stmt: CreateUserStatement,
114) -> Result<RwPgResponse> {
115 let session = handler_args.session;
116 let database_id = {
117 let catalog_reader = session.env().catalog_reader().read_guard();
118 catalog_reader
119 .get_database_by_name(&session.database())
120 .expect("session database should exist")
121 .id()
122 };
123 let user_info = {
124 let user_name = Binder::resolve_user_name(stmt.user_name)?;
125 let user_reader = session.env().user_info_reader().read_guard();
126 if user_reader.get_user_by_name(&user_name).is_some() {
127 return Err(CatalogError::duplicated("user", user_name).into());
128 }
129
130 let session_user = user_reader
131 .get_user_by_name(&session.user_name())
132 .ok_or_else(|| CatalogError::NotFound("user", session.user_name()))?;
133
134 make_prost_user_info(user_name, &stmt.with_options, session_user, database_id)?
135 };
136
137 let user_info_writer = session.user_info_writer()?;
138 user_info_writer.create_user(user_info).await?;
139 Ok(PgResponse::empty_result(StatementType::CREATE_USER))
140}
141
142#[cfg(test)]
143mod tests {
144 use std::collections::HashMap;
145
146 use risingwave_common::catalog::DEFAULT_DATABASE_NAME;
147 use risingwave_pb::user::AuthInfo;
148 use risingwave_pb::user::auth_info::EncryptionType;
149
150 use crate::test_utils::LocalFrontend;
151
152 #[tokio::test]
153 async fn test_create_user() {
154 let frontend = LocalFrontend::new(Default::default()).await;
155 let session = frontend.session_ref();
156 let user_info_reader = session.env().user_info_reader();
157
158 frontend.run_sql("CREATE USER user WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
159
160 let user_info = user_info_reader
161 .read_guard()
162 .get_user_by_name("user")
163 .cloned()
164 .unwrap();
165 assert!(!user_info.is_super);
166 assert!(user_info.can_login);
167 assert!(user_info.can_create_db);
168 assert!(!user_info.can_create_user);
169 assert_eq!(
170 user_info.auth_info,
171 Some(AuthInfo {
172 encryption_type: EncryptionType::Md5 as i32,
173 encrypted_value: b"827ccb0eea8a706c4c34a16891f84e7b".to_vec(),
174 metadata: HashMap::new(),
175 })
176 );
177 frontend
178 .run_sql("CREATE USER usercreator WITH NOSUPERUSER CREATEUSER PASSWORD ''")
179 .await
180 .unwrap();
181 assert!(
182 frontend
183 .run_user_sql(
184 "CREATE USER fail WITH PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'",
185 DEFAULT_DATABASE_NAME.to_owned(),
186 "user".to_owned(),
187 user_info.id
188 )
189 .await
190 .is_err()
191 );
192
193 assert!(
194 frontend
195 .run_user_sql(
196 "CREATE USER success WITH NOSUPERUSER PASSWORD ''",
197 DEFAULT_DATABASE_NAME.to_owned(),
198 "usercreator".to_owned(),
199 user_info.id
200 )
201 .await
202 .is_ok()
203 );
204 assert!(
205 frontend
206 .run_user_sql(
207 "CREATE USER fail2 WITH SUPERUSER PASSWORD ''",
208 DEFAULT_DATABASE_NAME.to_owned(),
209 "usercreator".to_owned(),
210 user_info.id
211 )
212 .await
213 .is_err()
214 );
215 }
216}