risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_subscriptions.rs1use risingwave_common::types::{Fields, Timestamptz};
16use risingwave_frontend_macro::system_catalog;
17
18use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
19use crate::error::Result;
20
21#[derive(Fields)]
22struct RwSubscription {
23 #[primary_key]
24 id: i32,
25 name: String,
26 schema_id: i32,
27 owner: i32,
28 definition: String,
29 acl: Vec<String>,
30 initialized_at: Option<Timestamptz>,
31 created_at: Option<Timestamptz>,
32 initialized_at_cluster_version: Option<String>,
33 created_at_cluster_version: Option<String>,
34}
35
36#[system_catalog(table, "rw_catalog.rw_subscriptions")]
37fn read_rw_subscriptions_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSubscription>> {
38 let catalog_reader = reader.catalog_reader.read_guard();
39 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
40 let user_reader = reader.user_info_reader.read_guard();
41 let current_user = user_reader
42 .get_user_by_name(&reader.auth_context.user_name)
43 .expect("user not found");
44 let users = user_reader.get_all_users();
45 let username_map = user_reader.get_user_name_map();
46
47 Ok(schemas
48 .flat_map(|schema| {
49 schema
50 .iter_subscription_with_acl(current_user)
51 .map(|subscription| RwSubscription {
52 id: subscription.id.as_i32_id(),
53 name: subscription.name.clone(),
54 schema_id: schema.id().as_i32_id(),
55 owner: subscription.owner.user_id as i32,
56 definition: subscription.definition.clone(),
57 acl: get_acl_items(subscription.id, false, &users, username_map),
58 initialized_at: subscription
59 .initialized_at_epoch
60 .map(|e| e.as_timestamptz()),
61 created_at: subscription.created_at_epoch.map(|e| e.as_timestamptz()),
62 initialized_at_cluster_version: subscription
63 .initialized_at_cluster_version
64 .clone(),
65 created_at_cluster_version: subscription.created_at_cluster_version.clone(),
66 })
67 })
68 .collect())
69}