risingwave_frontend/user/
user_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use parking_lot::lock_api::ArcRwLockReadGuard;
19use parking_lot::{RawRwLock, RwLock};
20use risingwave_pb::user::update_user_request::UpdateField;
21use risingwave_pb::user::{GrantPrivilege, UserInfo};
22use risingwave_rpc_client::MetaClient;
23use tokio::sync::watch::Receiver;
24
25use crate::error::Result;
26use crate::user::user_manager::UserInfoManager;
27use crate::user::{UserId, UserInfoVersion};
28
29pub type UserInfoReadGuard = ArcRwLockReadGuard<RawRwLock, UserInfoManager>;
30
31#[derive(Clone)]
32pub struct UserInfoReader(Arc<RwLock<UserInfoManager>>);
33impl UserInfoReader {
34    pub fn new(inner: Arc<RwLock<UserInfoManager>>) -> Self {
35        UserInfoReader(inner)
36    }
37
38    pub fn read_guard(&self) -> UserInfoReadGuard {
39        // Make this recursive so that one can get this guard in the same thread without fear.
40        self.0.read_arc_recursive()
41    }
42}
43
44#[async_trait::async_trait]
45pub trait UserInfoWriter: Send + Sync {
46    async fn create_user(&self, user_info: UserInfo) -> Result<()>;
47
48    async fn drop_user(&self, id: UserId) -> Result<()>;
49
50    async fn update_user(&self, user: UserInfo, update_fields: Vec<UpdateField>) -> Result<()>;
51
52    async fn grant_privilege(
53        &self,
54        users: Vec<UserId>,
55        privileges: Vec<GrantPrivilege>,
56        with_grant_option: bool,
57        grantor: UserId,
58    ) -> Result<()>;
59
60    async fn revoke_privilege(
61        &self,
62        users: Vec<UserId>,
63        privileges: Vec<GrantPrivilege>,
64        granted_by: UserId,
65        revoke_by: UserId,
66        revoke_grant_option: bool,
67        cascade: bool,
68    ) -> Result<()>;
69}
70
71#[derive(Clone)]
72pub struct UserInfoWriterImpl {
73    meta_client: MetaClient,
74    user_updated_rx: Receiver<UserInfoVersion>,
75}
76
77#[async_trait::async_trait]
78impl UserInfoWriter for UserInfoWriterImpl {
79    async fn create_user(&self, user_info: UserInfo) -> Result<()> {
80        let version = self.meta_client.create_user(user_info).await?;
81        self.wait_version(version).await
82    }
83
84    async fn drop_user(&self, id: UserId) -> Result<()> {
85        let version = self.meta_client.drop_user(id).await?;
86        self.wait_version(version).await
87    }
88
89    async fn update_user(&self, user: UserInfo, update_fields: Vec<UpdateField>) -> Result<()> {
90        let version = self.meta_client.update_user(user, update_fields).await?;
91        self.wait_version(version).await
92    }
93
94    async fn grant_privilege(
95        &self,
96        users: Vec<UserId>,
97        privileges: Vec<GrantPrivilege>,
98        with_grant_option: bool,
99        granted_by: UserId,
100    ) -> Result<()> {
101        let version = self
102            .meta_client
103            .grant_privilege(users, privileges, with_grant_option, granted_by)
104            .await?;
105        self.wait_version(version).await
106    }
107
108    async fn revoke_privilege(
109        &self,
110        users: Vec<UserId>,
111        privileges: Vec<GrantPrivilege>,
112        granted_by: UserId,
113        revoke_by: UserId,
114        revoke_grant_option: bool,
115        cascade: bool,
116    ) -> Result<()> {
117        let version = self
118            .meta_client
119            .revoke_privilege(
120                users,
121                privileges,
122                granted_by,
123                revoke_by,
124                revoke_grant_option,
125                cascade,
126            )
127            .await?;
128        self.wait_version(version).await
129    }
130}
131
132impl UserInfoWriterImpl {
133    pub fn new(meta_client: MetaClient, user_updated_rx: Receiver<UserInfoVersion>) -> Self {
134        UserInfoWriterImpl {
135            meta_client,
136            user_updated_rx,
137        }
138    }
139
140    async fn wait_version(&self, version: UserInfoVersion) -> Result<()> {
141        let mut rx = self.user_updated_rx.clone();
142        while *rx.borrow_and_update() < version {
143            rx.changed().await.map_err(|e| anyhow!(e))?;
144        }
145        Ok(())
146    }
147}