risingwave_frontend/catalog/
subscription_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{OBJECT_ID_PLACEHOLDER, TableId, UserId};
16use risingwave_common::util::epoch::Epoch;
17use risingwave_pb::catalog::PbSubscription;
18use risingwave_pb::catalog::subscription::PbSubscriptionState;
19
20use super::OwnedByUserCatalog;
21use crate::WithOptions;
22use crate::error::{ErrorCode, Result};
23use crate::handler::util::convert_interval_to_u64_seconds;
24
25#[derive(Clone, Debug, PartialEq, Eq, Hash)]
26#[cfg_attr(test, derive(Default))]
27pub struct SubscriptionCatalog {
28    /// Id of the subscription. For debug now.
29    pub id: SubscriptionId,
30
31    /// Name of the subscription. For debug now.
32    pub name: String,
33
34    /// Full SQL definition of the subscription. For debug now.
35    pub definition: String,
36
37    /// The retention seconds of the subscription.
38    pub retention_seconds: u64,
39
40    /// The database id
41    pub database_id: u32,
42
43    /// The schema id
44    pub schema_id: u32,
45
46    /// The subscription depends on the upstream list
47    pub dependent_table_id: TableId,
48
49    /// The user id
50    pub owner: UserId,
51
52    pub initialized_at_epoch: Option<Epoch>,
53    pub created_at_epoch: Option<Epoch>,
54
55    pub created_at_cluster_version: Option<String>,
56    pub initialized_at_cluster_version: Option<String>,
57
58    pub subscription_state: SubscriptionState,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
62pub enum SubscriptionState {
63    #[default]
64    Init,
65    Created,
66}
67
68#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
69pub struct SubscriptionId {
70    pub subscription_id: u32,
71}
72
73impl SubscriptionId {
74    pub const fn new(subscription_id: u32) -> Self {
75        SubscriptionId { subscription_id }
76    }
77
78    /// Sometimes the id field is filled later, we use this value for better debugging.
79    pub const fn placeholder() -> Self {
80        SubscriptionId {
81            subscription_id: OBJECT_ID_PLACEHOLDER,
82        }
83    }
84
85    pub fn subscription_id(&self) -> u32 {
86        self.subscription_id
87    }
88}
89
90impl SubscriptionCatalog {
91    pub fn set_retention_seconds(&mut self, properties: &WithOptions) -> Result<()> {
92        let retention_seconds_str = properties.get("retention").ok_or_else(|| {
93            ErrorCode::InternalError("Subscription retention time not set.".to_owned())
94        })?;
95        let retention_seconds = convert_interval_to_u64_seconds(retention_seconds_str)?;
96        self.retention_seconds = retention_seconds;
97        Ok(())
98    }
99
100    pub fn create_sql(&self) -> String {
101        self.definition.clone()
102    }
103
104    pub fn to_proto(&self) -> PbSubscription {
105        PbSubscription {
106            id: self.id.subscription_id,
107            name: self.name.clone(),
108            definition: self.definition.clone(),
109            retention_seconds: self.retention_seconds,
110            database_id: self.database_id,
111            schema_id: self.schema_id,
112            initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
113            created_at_epoch: self.created_at_epoch.map(|e| e.0),
114            owner: self.owner.into(),
115            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
116            created_at_cluster_version: self.created_at_cluster_version.clone(),
117            dependent_table_id: self.dependent_table_id.table_id,
118            subscription_state: match self.subscription_state {
119                SubscriptionState::Init => PbSubscriptionState::Init.into(),
120                SubscriptionState::Created => PbSubscriptionState::Created.into(),
121            },
122        }
123    }
124}
125
126impl From<&PbSubscription> for SubscriptionCatalog {
127    fn from(prost: &PbSubscription) -> Self {
128        Self {
129            id: SubscriptionId::new(prost.id),
130            name: prost.name.clone(),
131            definition: prost.definition.clone(),
132            retention_seconds: prost.retention_seconds,
133            database_id: prost.database_id,
134            schema_id: prost.schema_id,
135            dependent_table_id: TableId::new(prost.dependent_table_id),
136            owner: prost.owner.into(),
137            created_at_epoch: prost.created_at_epoch.map(Epoch::from),
138            initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
139            created_at_cluster_version: prost.created_at_cluster_version.clone(),
140            initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
141            subscription_state: match PbSubscriptionState::try_from(prost.subscription_state)
142                .unwrap()
143            {
144                PbSubscriptionState::Init => SubscriptionState::Init,
145                PbSubscriptionState::Created => SubscriptionState::Created,
146                PbSubscriptionState::Unspecified => unreachable!(),
147            },
148        }
149    }
150}
151
152impl OwnedByUserCatalog for SubscriptionCatalog {
153    fn owner(&self) -> u32 {
154        self.owner.user_id
155    }
156}