risingwave_frontend/catalog/
subscription_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{TableId, UserId};
16pub use risingwave_common::id::SubscriptionId;
17use risingwave_common::id::{DatabaseId, SchemaId};
18use risingwave_common::util::epoch::Epoch;
19use risingwave_pb::catalog::PbSubscription;
20use risingwave_pb::catalog::subscription::PbSubscriptionState;
21
22use super::OwnedByUserCatalog;
23use crate::WithOptions;
24use crate::error::{ErrorCode, Result};
25use crate::handler::util::convert_interval_to_u64_seconds;
26
27#[derive(Clone, Debug, PartialEq, Eq, Hash)]
28#[cfg_attr(test, derive(Default))]
29pub struct SubscriptionCatalog {
30    /// Id of the subscription. For debug now.
31    pub id: SubscriptionId,
32
33    /// Name of the subscription. For debug now.
34    pub name: String,
35
36    /// Full SQL definition of the subscription. For debug now.
37    pub definition: String,
38
39    /// The retention seconds of the subscription.
40    pub retention_seconds: u64,
41
42    /// The database id
43    pub database_id: DatabaseId,
44
45    /// The schema id
46    pub schema_id: SchemaId,
47
48    /// The subscription depends on the upstream list
49    pub dependent_table_id: TableId,
50
51    /// The user id
52    pub owner: UserId,
53
54    pub initialized_at_epoch: Option<Epoch>,
55    pub created_at_epoch: Option<Epoch>,
56
57    pub created_at_cluster_version: Option<String>,
58    pub initialized_at_cluster_version: Option<String>,
59
60    pub subscription_state: SubscriptionState,
61}
62
63#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
64pub enum SubscriptionState {
65    #[default]
66    Init,
67    Created,
68}
69
70impl SubscriptionCatalog {
71    pub fn set_retention_seconds(&mut self, properties: &WithOptions) -> Result<()> {
72        let retention_seconds_str = properties.get("retention").ok_or_else(|| {
73            ErrorCode::InternalError("Subscription retention time not set.".to_owned())
74        })?;
75        let retention_seconds = convert_interval_to_u64_seconds(retention_seconds_str)?;
76        self.retention_seconds = retention_seconds;
77        Ok(())
78    }
79
80    pub fn create_sql(&self) -> String {
81        self.definition.clone()
82    }
83
84    pub fn to_proto(&self) -> PbSubscription {
85        PbSubscription {
86            id: self.id,
87            name: self.name.clone(),
88            definition: self.definition.clone(),
89            retention_seconds: self.retention_seconds,
90            database_id: self.database_id,
91            schema_id: self.schema_id,
92            initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
93            created_at_epoch: self.created_at_epoch.map(|e| e.0),
94            owner: self.owner.into(),
95            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
96            created_at_cluster_version: self.created_at_cluster_version.clone(),
97            dependent_table_id: self.dependent_table_id,
98            subscription_state: match self.subscription_state {
99                SubscriptionState::Init => PbSubscriptionState::Init.into(),
100                SubscriptionState::Created => PbSubscriptionState::Created.into(),
101            },
102        }
103    }
104}
105
106impl From<&PbSubscription> for SubscriptionCatalog {
107    fn from(prost: &PbSubscription) -> Self {
108        Self {
109            id: prost.id,
110            name: prost.name.clone(),
111            definition: prost.definition.clone(),
112            retention_seconds: prost.retention_seconds,
113            database_id: prost.database_id,
114            schema_id: prost.schema_id,
115            dependent_table_id: prost.dependent_table_id,
116            owner: prost.owner.into(),
117            created_at_epoch: prost.created_at_epoch.map(Epoch::from),
118            initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
119            created_at_cluster_version: prost.created_at_cluster_version.clone(),
120            initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
121            subscription_state: match PbSubscriptionState::try_from(prost.subscription_state)
122                .unwrap()
123            {
124                PbSubscriptionState::Init => SubscriptionState::Init,
125                PbSubscriptionState::Created => SubscriptionState::Created,
126                PbSubscriptionState::Unspecified => unreachable!(),
127            },
128        }
129    }
130}
131
132impl OwnedByUserCatalog for SubscriptionCatalog {
133    fn owner(&self) -> u32 {
134        self.owner.user_id
135    }
136}