risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_subscriptions.rs1use risingwave_common::id::{SchemaId, SubscriptionId, UserId};
16use risingwave_common::types::{Fields, Timestamptz};
17use risingwave_frontend_macro::system_catalog;
18
19use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
20use crate::error::Result;
21
22#[derive(Fields)]
23struct RwSubscription {
24 #[primary_key]
25 id: SubscriptionId,
26 name: String,
27 schema_id: SchemaId,
28 owner: UserId,
29 definition: String,
30 retention_seconds: i64,
31 acl: Vec<String>,
32 initialized_at: Option<Timestamptz>,
33 created_at: Option<Timestamptz>,
34 initialized_at_cluster_version: Option<String>,
35 created_at_cluster_version: Option<String>,
36}
37
38#[system_catalog(table, "rw_catalog.rw_subscriptions")]
39fn read_rw_subscriptions_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSubscription>> {
40 let catalog_reader = reader.catalog_reader.read_guard();
41 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
42 let user_reader = reader.user_info_reader.read_guard();
43 let current_user = user_reader
44 .get_user_by_name(&reader.auth_context.user_name)
45 .expect("user not found");
46 let users = user_reader.get_all_users();
47 let username_map = user_reader.get_user_name_map();
48
49 Ok(schemas
50 .flat_map(|schema| {
51 schema
52 .iter_subscription_with_acl(current_user)
53 .map(|subscription| RwSubscription {
54 id: subscription.id,
55 name: subscription.name.clone(),
56 schema_id: schema.id(),
57 owner: subscription.owner,
58 definition: subscription.definition.clone(),
59 retention_seconds: subscription.retention_seconds as i64,
60 acl: get_acl_items(subscription.id, false, &users, username_map),
61 initialized_at: subscription
62 .initialized_at_epoch
63 .map(|e| e.as_timestamptz()),
64 created_at: subscription.created_at_epoch.map(|e| e.as_timestamptz()),
65 initialized_at_cluster_version: subscription
66 .initialized_at_cluster_version
67 .clone(),
68 created_at_cluster_version: subscription.created_at_cluster_version.clone(),
69 })
70 })
71 .collect())
72}