risingwave_frontend/catalog/
subscription_catalog.rs1use risingwave_common::catalog::{OBJECT_ID_PLACEHOLDER, TableId, UserId};
16use risingwave_common::util::epoch::Epoch;
17use risingwave_pb::catalog::PbSubscription;
18use risingwave_pb::catalog::subscription::PbSubscriptionState;
19
20use super::OwnedByUserCatalog;
21use crate::WithOptions;
22use crate::error::{ErrorCode, Result};
23use crate::handler::util::convert_interval_to_u64_seconds;
24
25#[derive(Clone, Debug, PartialEq, Eq, Hash)]
26#[cfg_attr(test, derive(Default))]
27pub struct SubscriptionCatalog {
28 pub id: SubscriptionId,
30
31 pub name: String,
33
34 pub definition: String,
36
37 pub retention_seconds: u64,
39
40 pub database_id: u32,
42
43 pub schema_id: u32,
45
46 pub dependent_table_id: TableId,
48
49 pub owner: UserId,
51
52 pub initialized_at_epoch: Option<Epoch>,
53 pub created_at_epoch: Option<Epoch>,
54
55 pub created_at_cluster_version: Option<String>,
56 pub initialized_at_cluster_version: Option<String>,
57
58 pub subscription_state: SubscriptionState,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
62pub enum SubscriptionState {
63 #[default]
64 Init,
65 Created,
66}
67
68#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
69pub struct SubscriptionId {
70 pub subscription_id: u32,
71}
72
73impl SubscriptionId {
74 pub const fn new(subscription_id: u32) -> Self {
75 SubscriptionId { subscription_id }
76 }
77
78 pub const fn placeholder() -> Self {
80 SubscriptionId {
81 subscription_id: OBJECT_ID_PLACEHOLDER,
82 }
83 }
84
85 pub fn subscription_id(&self) -> u32 {
86 self.subscription_id
87 }
88}
89
90impl SubscriptionCatalog {
91 pub fn set_retention_seconds(&mut self, properties: &WithOptions) -> Result<()> {
92 let retention_seconds_str = properties.get("retention").ok_or_else(|| {
93 ErrorCode::InternalError("Subscription retention time not set.".to_owned())
94 })?;
95 let retention_seconds = convert_interval_to_u64_seconds(retention_seconds_str)?;
96 self.retention_seconds = retention_seconds;
97 Ok(())
98 }
99
100 pub fn create_sql(&self) -> String {
101 self.definition.clone()
102 }
103
104 pub fn to_proto(&self) -> PbSubscription {
105 PbSubscription {
106 id: self.id.subscription_id,
107 name: self.name.clone(),
108 definition: self.definition.clone(),
109 retention_seconds: self.retention_seconds,
110 database_id: self.database_id,
111 schema_id: self.schema_id,
112 initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
113 created_at_epoch: self.created_at_epoch.map(|e| e.0),
114 owner: self.owner.into(),
115 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
116 created_at_cluster_version: self.created_at_cluster_version.clone(),
117 dependent_table_id: self.dependent_table_id.table_id,
118 subscription_state: match self.subscription_state {
119 SubscriptionState::Init => PbSubscriptionState::Init.into(),
120 SubscriptionState::Created => PbSubscriptionState::Created.into(),
121 },
122 }
123 }
124}
125
126impl From<&PbSubscription> for SubscriptionCatalog {
127 fn from(prost: &PbSubscription) -> Self {
128 Self {
129 id: SubscriptionId::new(prost.id),
130 name: prost.name.clone(),
131 definition: prost.definition.clone(),
132 retention_seconds: prost.retention_seconds,
133 database_id: prost.database_id,
134 schema_id: prost.schema_id,
135 dependent_table_id: TableId::new(prost.dependent_table_id),
136 owner: prost.owner.into(),
137 created_at_epoch: prost.created_at_epoch.map(Epoch::from),
138 initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
139 created_at_cluster_version: prost.created_at_cluster_version.clone(),
140 initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
141 subscription_state: match PbSubscriptionState::try_from(prost.subscription_state)
142 .unwrap()
143 {
144 PbSubscriptionState::Init => SubscriptionState::Init,
145 PbSubscriptionState::Created => SubscriptionState::Created,
146 PbSubscriptionState::Unspecified => unreachable!(),
147 },
148 }
149 }
150}
151
152impl OwnedByUserCatalog for SubscriptionCatalog {
153 fn owner(&self) -> u32 {
154 self.owner.user_id
155 }
156}