risingwave_frontend/catalog/
subscription_catalog.rs1use risingwave_common::catalog::{OBJECT_ID_PLACEHOLDER, TableId, UserId};
16use risingwave_common::util::epoch::Epoch;
17use risingwave_pb::catalog::PbSubscription;
18use risingwave_pb::catalog::subscription::PbSubscriptionState;
19
20use super::OwnedByUserCatalog;
21use crate::WithOptions;
22use crate::error::{ErrorCode, Result};
23use crate::handler::util::convert_interval_to_u64_seconds;
24
25#[derive(Clone, Debug, PartialEq, Eq, Hash)]
26#[cfg_attr(test, derive(Default))]
27pub struct SubscriptionCatalog {
28 pub id: SubscriptionId,
30
31 pub name: String,
33
34 pub definition: String,
36
37 pub retention_seconds: u64,
39
40 pub database_id: u32,
42
43 pub schema_id: u32,
45
46 pub dependent_table_id: TableId,
48
49 pub owner: UserId,
51
52 pub initialized_at_epoch: Option<Epoch>,
53 pub created_at_epoch: Option<Epoch>,
54
55 pub created_at_cluster_version: Option<String>,
56 pub initialized_at_cluster_version: Option<String>,
57}
58
59#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
60pub struct SubscriptionId {
61 pub subscription_id: u32,
62}
63
64impl SubscriptionId {
65 pub const fn new(subscription_id: u32) -> Self {
66 SubscriptionId { subscription_id }
67 }
68
69 pub const fn placeholder() -> Self {
71 SubscriptionId {
72 subscription_id: OBJECT_ID_PLACEHOLDER,
73 }
74 }
75
76 pub fn subscription_id(&self) -> u32 {
77 self.subscription_id
78 }
79}
80
81impl SubscriptionCatalog {
82 pub fn set_retention_seconds(&mut self, properties: &WithOptions) -> Result<()> {
83 let retention_seconds_str = properties.get("retention").ok_or_else(|| {
84 ErrorCode::InternalError("Subscription retention time not set.".to_owned())
85 })?;
86 let retention_seconds = convert_interval_to_u64_seconds(retention_seconds_str)?;
87 self.retention_seconds = retention_seconds;
88 Ok(())
89 }
90
91 pub fn create_sql(&self) -> String {
92 self.definition.clone()
93 }
94
95 pub fn to_proto(&self) -> PbSubscription {
96 PbSubscription {
97 id: self.id.subscription_id,
98 name: self.name.clone(),
99 definition: self.definition.clone(),
100 retention_seconds: self.retention_seconds,
101 database_id: self.database_id,
102 schema_id: self.schema_id,
103 initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
104 created_at_epoch: self.created_at_epoch.map(|e| e.0),
105 owner: self.owner.into(),
106 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
107 created_at_cluster_version: self.created_at_cluster_version.clone(),
108 dependent_table_id: self.dependent_table_id.table_id,
109 subscription_state: PbSubscriptionState::Init.into(),
110 }
111 }
112}
113
114impl From<&PbSubscription> for SubscriptionCatalog {
115 fn from(prost: &PbSubscription) -> Self {
116 Self {
117 id: SubscriptionId::new(prost.id),
118 name: prost.name.clone(),
119 definition: prost.definition.clone(),
120 retention_seconds: prost.retention_seconds,
121 database_id: prost.database_id,
122 schema_id: prost.schema_id,
123 dependent_table_id: TableId::new(prost.dependent_table_id),
124 owner: prost.owner.into(),
125 created_at_epoch: prost.created_at_epoch.map(Epoch::from),
126 initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
127 created_at_cluster_version: prost.created_at_cluster_version.clone(),
128 initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
129 }
130 }
131}
132
133impl OwnedByUserCatalog for SubscriptionCatalog {
134 fn owner(&self) -> u32 {
135 self.owner.user_id
136 }
137}