risingwave_frontend/catalog/
subscription_catalog.rs1use risingwave_common::catalog::{TableId, UserId};
16pub use risingwave_common::id::SubscriptionId;
17use risingwave_common::id::{DatabaseId, SchemaId};
18use risingwave_common::util::epoch::Epoch;
19use risingwave_pb::catalog::PbSubscription;
20use risingwave_pb::catalog::subscription::PbSubscriptionState;
21
22use super::OwnedByUserCatalog;
23use crate::WithOptions;
24use crate::error::{ErrorCode, Result};
25use crate::handler::util::convert_interval_to_u64_seconds;
26
27#[derive(Clone, Debug, PartialEq, Eq, Hash)]
28#[cfg_attr(test, derive(Default))]
29pub struct SubscriptionCatalog {
30 pub id: SubscriptionId,
32
33 pub name: String,
35
36 pub definition: String,
38
39 pub retention_seconds: u64,
41
42 pub database_id: DatabaseId,
44
45 pub schema_id: SchemaId,
47
48 pub dependent_table_id: TableId,
50
51 pub owner: UserId,
53
54 pub initialized_at_epoch: Option<Epoch>,
55 pub created_at_epoch: Option<Epoch>,
56
57 pub created_at_cluster_version: Option<String>,
58 pub initialized_at_cluster_version: Option<String>,
59
60 pub subscription_state: SubscriptionState,
61}
62
63#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
64pub enum SubscriptionState {
65 #[default]
66 Init,
67 Created,
68}
69
70impl SubscriptionCatalog {
71 pub fn set_retention_seconds(&mut self, properties: &WithOptions) -> Result<()> {
72 let retention_seconds_str = properties.get("retention").ok_or_else(|| {
73 ErrorCode::InternalError("Subscription retention time not set.".to_owned())
74 })?;
75 let retention_seconds = convert_interval_to_u64_seconds(retention_seconds_str)?;
76 self.retention_seconds = retention_seconds;
77 Ok(())
78 }
79
80 pub fn create_sql(&self) -> String {
81 self.definition.clone()
82 }
83
84 pub fn to_proto(&self) -> PbSubscription {
85 PbSubscription {
86 id: self.id,
87 name: self.name.clone(),
88 definition: self.definition.clone(),
89 retention_seconds: self.retention_seconds,
90 database_id: self.database_id,
91 schema_id: self.schema_id,
92 initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
93 created_at_epoch: self.created_at_epoch.map(|e| e.0),
94 owner: self.owner.into(),
95 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
96 created_at_cluster_version: self.created_at_cluster_version.clone(),
97 dependent_table_id: self.dependent_table_id,
98 subscription_state: match self.subscription_state {
99 SubscriptionState::Init => PbSubscriptionState::Init.into(),
100 SubscriptionState::Created => PbSubscriptionState::Created.into(),
101 },
102 }
103 }
104}
105
106impl From<&PbSubscription> for SubscriptionCatalog {
107 fn from(prost: &PbSubscription) -> Self {
108 Self {
109 id: prost.id,
110 name: prost.name.clone(),
111 definition: prost.definition.clone(),
112 retention_seconds: prost.retention_seconds,
113 database_id: prost.database_id,
114 schema_id: prost.schema_id,
115 dependent_table_id: prost.dependent_table_id,
116 owner: prost.owner.into(),
117 created_at_epoch: prost.created_at_epoch.map(Epoch::from),
118 initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
119 created_at_cluster_version: prost.created_at_cluster_version.clone(),
120 initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
121 subscription_state: match PbSubscriptionState::try_from(prost.subscription_state)
122 .unwrap()
123 {
124 PbSubscriptionState::Init => SubscriptionState::Init,
125 PbSubscriptionState::Created => SubscriptionState::Created,
126 PbSubscriptionState::Unspecified => unreachable!(),
127 },
128 }
129 }
130}
131
132impl OwnedByUserCatalog for SubscriptionCatalog {
133 fn owner(&self) -> u32 {
134 self.owner.user_id
135 }
136}