risingwave_frontend/user/
user_service.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use parking_lot::lock_api::ArcRwLockReadGuard;
19use parking_lot::{RawRwLock, RwLock};
20use risingwave_pb::user::update_user_request::UpdateField;
21use risingwave_pb::user::{GrantPrivilege, UserInfo};
22use risingwave_rpc_client::MetaClient;
23use tokio::sync::watch::Receiver;
24
25use crate::error::Result;
26use crate::user::user_manager::UserInfoManager;
27use crate::user::{UserId, UserInfoVersion};
28
29pub type UserInfoReadGuard = ArcRwLockReadGuard<RawRwLock, UserInfoManager>;
30
31#[derive(Clone)]
32pub struct UserInfoReader(Arc<RwLock<UserInfoManager>>);
33impl UserInfoReader {
34 pub fn new(inner: Arc<RwLock<UserInfoManager>>) -> Self {
35 UserInfoReader(inner)
36 }
37
38 pub fn read_guard(&self) -> UserInfoReadGuard {
39 self.0.read_arc_recursive()
41 }
42}
43
44#[async_trait::async_trait]
45pub trait UserInfoWriter: Send + Sync {
46 async fn create_user(&self, user_info: UserInfo) -> Result<()>;
47
48 async fn drop_user(&self, id: UserId) -> Result<()>;
49
50 async fn update_user(&self, user: UserInfo, update_fields: Vec<UpdateField>) -> Result<()>;
51
52 async fn grant_privilege(
53 &self,
54 users: Vec<UserId>,
55 privileges: Vec<GrantPrivilege>,
56 with_grant_option: bool,
57 grantor: UserId,
58 ) -> Result<()>;
59
60 async fn revoke_privilege(
61 &self,
62 users: Vec<UserId>,
63 privileges: Vec<GrantPrivilege>,
64 granted_by: UserId,
65 revoke_by: UserId,
66 revoke_grant_option: bool,
67 cascade: bool,
68 ) -> Result<()>;
69}
70
71#[derive(Clone)]
72pub struct UserInfoWriterImpl {
73 meta_client: MetaClient,
74 user_updated_rx: Receiver<UserInfoVersion>,
75}
76
77#[async_trait::async_trait]
78impl UserInfoWriter for UserInfoWriterImpl {
79 async fn create_user(&self, user_info: UserInfo) -> Result<()> {
80 let version = self.meta_client.create_user(user_info).await?;
81 self.wait_version(version).await
82 }
83
84 async fn drop_user(&self, id: UserId) -> Result<()> {
85 let version = self.meta_client.drop_user(id).await?;
86 self.wait_version(version).await
87 }
88
89 async fn update_user(&self, user: UserInfo, update_fields: Vec<UpdateField>) -> Result<()> {
90 let version = self.meta_client.update_user(user, update_fields).await?;
91 self.wait_version(version).await
92 }
93
94 async fn grant_privilege(
95 &self,
96 users: Vec<UserId>,
97 privileges: Vec<GrantPrivilege>,
98 with_grant_option: bool,
99 granted_by: UserId,
100 ) -> Result<()> {
101 let version = self
102 .meta_client
103 .grant_privilege(users, privileges, with_grant_option, granted_by)
104 .await?;
105 self.wait_version(version).await
106 }
107
108 async fn revoke_privilege(
109 &self,
110 users: Vec<UserId>,
111 privileges: Vec<GrantPrivilege>,
112 granted_by: UserId,
113 revoke_by: UserId,
114 revoke_grant_option: bool,
115 cascade: bool,
116 ) -> Result<()> {
117 let version = self
118 .meta_client
119 .revoke_privilege(
120 users,
121 privileges,
122 granted_by,
123 revoke_by,
124 revoke_grant_option,
125 cascade,
126 )
127 .await?;
128 self.wait_version(version).await
129 }
130}
131
132impl UserInfoWriterImpl {
133 pub fn new(meta_client: MetaClient, user_updated_rx: Receiver<UserInfoVersion>) -> Self {
134 UserInfoWriterImpl {
135 meta_client,
136 user_updated_rx,
137 }
138 }
139
140 async fn wait_version(&self, version: UserInfoVersion) -> Result<()> {
141 let mut rx = self.user_updated_rx.clone();
142 while *rx.borrow_and_update() < version {
143 rx.changed().await.map_err(|e| anyhow!(e))?;
144 }
145 Ok(())
146 }
147}