risingwave_frontend/user/
user_service.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use parking_lot::lock_api::ArcRwLockReadGuard;
19use parking_lot::{RawRwLock, RwLock};
20use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
21use risingwave_pb::user::update_user_request::UpdateField;
22use risingwave_pb::user::{GrantPrivilege, UserInfo};
23use risingwave_rpc_client::MetaClient;
24use tokio::sync::watch::Receiver;
25
26use crate::catalog::{DatabaseId, SchemaId};
27use crate::error::Result;
28use crate::user::user_manager::UserInfoManager;
29use crate::user::{UserId, UserInfoVersion};
30
31pub type UserInfoReadGuard = ArcRwLockReadGuard<RawRwLock, UserInfoManager>;
32
33#[derive(Clone)]
34pub struct UserInfoReader(Arc<RwLock<UserInfoManager>>);
35impl UserInfoReader {
36 pub fn new(inner: Arc<RwLock<UserInfoManager>>) -> Self {
37 UserInfoReader(inner)
38 }
39
40 pub fn read_guard(&self) -> UserInfoReadGuard {
41 self.0.read_arc_recursive()
43 }
44}
45
46#[async_trait::async_trait]
47pub trait UserInfoWriter: Send + Sync {
48 async fn create_user(&self, user_info: UserInfo) -> Result<()>;
49
50 async fn drop_user(&self, id: UserId) -> Result<()>;
51
52 async fn update_user(&self, user: UserInfo, update_fields: Vec<UpdateField>) -> Result<()>;
53
54 async fn grant_privilege(
55 &self,
56 users: Vec<UserId>,
57 privileges: Vec<GrantPrivilege>,
58 with_grant_option: bool,
59 grantor: UserId,
60 ) -> Result<()>;
61
62 async fn revoke_privilege(
63 &self,
64 users: Vec<UserId>,
65 privileges: Vec<GrantPrivilege>,
66 granted_by: UserId,
67 revoke_by: UserId,
68 revoke_grant_option: bool,
69 cascade: bool,
70 ) -> Result<()>;
71
72 async fn alter_default_privilege(
73 &self,
74 users: Vec<UserId>,
75 database_id: DatabaseId,
76 schemas: Vec<SchemaId>,
77 operation: AlterDefaultPrivilegeOperation,
78 granted_by: UserId,
79 ) -> Result<()>;
80}
81
82#[derive(Clone)]
83pub struct UserInfoWriterImpl {
84 meta_client: MetaClient,
85 user_updated_rx: Receiver<UserInfoVersion>,
86}
87
88#[async_trait::async_trait]
89impl UserInfoWriter for UserInfoWriterImpl {
90 async fn create_user(&self, user_info: UserInfo) -> Result<()> {
91 let version = self.meta_client.create_user(user_info).await?;
92 self.wait_version(version).await
93 }
94
95 async fn drop_user(&self, id: UserId) -> Result<()> {
96 let version = self.meta_client.drop_user(id).await?;
97 self.wait_version(version).await
98 }
99
100 async fn update_user(&self, user: UserInfo, update_fields: Vec<UpdateField>) -> Result<()> {
101 let version = self.meta_client.update_user(user, update_fields).await?;
102 self.wait_version(version).await
103 }
104
105 async fn grant_privilege(
106 &self,
107 users: Vec<UserId>,
108 privileges: Vec<GrantPrivilege>,
109 with_grant_option: bool,
110 granted_by: UserId,
111 ) -> Result<()> {
112 let version = self
113 .meta_client
114 .grant_privilege(users, privileges, with_grant_option, granted_by)
115 .await?;
116 self.wait_version(version).await
117 }
118
119 async fn revoke_privilege(
120 &self,
121 users: Vec<UserId>,
122 privileges: Vec<GrantPrivilege>,
123 granted_by: UserId,
124 revoke_by: UserId,
125 revoke_grant_option: bool,
126 cascade: bool,
127 ) -> Result<()> {
128 let version = self
129 .meta_client
130 .revoke_privilege(
131 users,
132 privileges,
133 granted_by,
134 revoke_by,
135 revoke_grant_option,
136 cascade,
137 )
138 .await?;
139 self.wait_version(version).await
140 }
141
142 async fn alter_default_privilege(
143 &self,
144 users: Vec<UserId>,
145 database_id: DatabaseId,
146 schemas: Vec<SchemaId>,
147 operation: AlterDefaultPrivilegeOperation,
148 granted_by: UserId,
149 ) -> Result<()> {
150 self.meta_client
151 .alter_default_privilege(users, database_id, schemas, operation, granted_by)
152 .await?;
153 Ok(())
154 }
155}
156
157impl UserInfoWriterImpl {
158 pub fn new(meta_client: MetaClient, user_updated_rx: Receiver<UserInfoVersion>) -> Self {
159 UserInfoWriterImpl {
160 meta_client,
161 user_updated_rx,
162 }
163 }
164
165 async fn wait_version(&self, version: UserInfoVersion) -> Result<()> {
166 let mut rx = self.user_updated_rx.clone();
167 while *rx.borrow_and_update() < version {
168 rx.changed().await.map_err(|e| anyhow!(e))?;
169 }
170 Ok(())
171 }
172}