risingwave_frontend/catalog/
subscription_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{OBJECT_ID_PLACEHOLDER, TableId, UserId};
16use risingwave_common::util::epoch::Epoch;
17use risingwave_pb::catalog::PbSubscription;
18use risingwave_pb::catalog::subscription::PbSubscriptionState;
19
20use super::OwnedByUserCatalog;
21use crate::WithOptions;
22use crate::error::{ErrorCode, Result};
23use crate::handler::util::convert_interval_to_u64_seconds;
24
25#[derive(Clone, Debug, PartialEq, Eq, Hash)]
26#[cfg_attr(test, derive(Default))]
27pub struct SubscriptionCatalog {
28    /// Id of the subscription. For debug now.
29    pub id: SubscriptionId,
30
31    /// Name of the subscription. For debug now.
32    pub name: String,
33
34    /// Full SQL definition of the subscription. For debug now.
35    pub definition: String,
36
37    /// The retention seconds of the subscription.
38    pub retention_seconds: u64,
39
40    /// The database id
41    pub database_id: u32,
42
43    /// The schema id
44    pub schema_id: u32,
45
46    /// The subscription depends on the upstream list
47    pub dependent_table_id: TableId,
48
49    /// The user id
50    pub owner: UserId,
51
52    pub initialized_at_epoch: Option<Epoch>,
53    pub created_at_epoch: Option<Epoch>,
54
55    pub created_at_cluster_version: Option<String>,
56    pub initialized_at_cluster_version: Option<String>,
57}
58
59#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
60pub struct SubscriptionId {
61    pub subscription_id: u32,
62}
63
64impl SubscriptionId {
65    pub const fn new(subscription_id: u32) -> Self {
66        SubscriptionId { subscription_id }
67    }
68
69    /// Sometimes the id field is filled later, we use this value for better debugging.
70    pub const fn placeholder() -> Self {
71        SubscriptionId {
72            subscription_id: OBJECT_ID_PLACEHOLDER,
73        }
74    }
75
76    pub fn subscription_id(&self) -> u32 {
77        self.subscription_id
78    }
79}
80
81impl SubscriptionCatalog {
82    pub fn set_retention_seconds(&mut self, properties: &WithOptions) -> Result<()> {
83        let retention_seconds_str = properties.get("retention").ok_or_else(|| {
84            ErrorCode::InternalError("Subscription retention time not set.".to_owned())
85        })?;
86        let retention_seconds = convert_interval_to_u64_seconds(retention_seconds_str)?;
87        self.retention_seconds = retention_seconds;
88        Ok(())
89    }
90
91    pub fn create_sql(&self) -> String {
92        self.definition.clone()
93    }
94
95    pub fn to_proto(&self) -> PbSubscription {
96        PbSubscription {
97            id: self.id.subscription_id,
98            name: self.name.clone(),
99            definition: self.definition.clone(),
100            retention_seconds: self.retention_seconds,
101            database_id: self.database_id,
102            schema_id: self.schema_id,
103            initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
104            created_at_epoch: self.created_at_epoch.map(|e| e.0),
105            owner: self.owner.into(),
106            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
107            created_at_cluster_version: self.created_at_cluster_version.clone(),
108            dependent_table_id: self.dependent_table_id.table_id,
109            subscription_state: PbSubscriptionState::Init.into(),
110        }
111    }
112}
113
114impl From<&PbSubscription> for SubscriptionCatalog {
115    fn from(prost: &PbSubscription) -> Self {
116        Self {
117            id: SubscriptionId::new(prost.id),
118            name: prost.name.clone(),
119            definition: prost.definition.clone(),
120            retention_seconds: prost.retention_seconds,
121            database_id: prost.database_id,
122            schema_id: prost.schema_id,
123            dependent_table_id: TableId::new(prost.dependent_table_id),
124            owner: prost.owner.into(),
125            created_at_epoch: prost.created_at_epoch.map(Epoch::from),
126            initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
127            created_at_cluster_version: prost.created_at_cluster_version.clone(),
128            initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
129        }
130    }
131}
132
133impl OwnedByUserCatalog for SubscriptionCatalog {
134    fn owner(&self) -> u32 {
135        self.owner.user_id
136    }
137}