risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_subscriptions.rs1use risingwave_common::types::{Fields, Timestamptz};
16use risingwave_frontend_macro::system_catalog;
17use risingwave_pb::user::grant_privilege::Object;
18
19use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
20use crate::error::Result;
21
22#[derive(Fields)]
23struct RwSubscription {
24 #[primary_key]
25 id: i32,
26 name: String,
27 schema_id: i32,
28 owner: i32,
29 definition: String,
30 acl: Vec<String>,
31 initialized_at: Option<Timestamptz>,
32 created_at: Option<Timestamptz>,
33 initialized_at_cluster_version: Option<String>,
34 created_at_cluster_version: Option<String>,
35}
36
37#[system_catalog(table, "rw_catalog.rw_subscriptions")]
38fn read_rw_subscriptions_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSubscription>> {
39 let catalog_reader = reader.catalog_reader.read_guard();
40 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
41 let user_reader = reader.user_info_reader.read_guard();
42 let current_user = user_reader
43 .get_user_by_name(&reader.auth_context.user_name)
44 .expect("user not found");
45 let users = user_reader.get_all_users();
46 let username_map = user_reader.get_user_name_map();
47
48 Ok(schemas
49 .flat_map(|schema| {
50 schema
51 .iter_subscription_with_acl(current_user)
52 .map(|subscription| RwSubscription {
53 id: subscription.id.subscription_id as i32,
54 name: subscription.name.clone(),
55 schema_id: schema.id() as i32,
56 owner: subscription.owner.user_id as i32,
57 definition: subscription.definition.clone(),
58 acl: get_acl_items(
59 &Object::SubscriptionId(subscription.id.subscription_id),
60 false,
61 &users,
62 username_map,
63 ),
64 initialized_at: subscription
65 .initialized_at_epoch
66 .map(|e| e.as_timestamptz()),
67 created_at: subscription.created_at_epoch.map(|e| e.as_timestamptz()),
68 initialized_at_cluster_version: subscription
69 .initialized_at_cluster_version
70 .clone(),
71 created_at_cluster_version: subscription.created_at_cluster_version.clone(),
72 })
73 })
74 .collect())
75}