risingwave_meta/manager/
notification_version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_meta_model::catalog_version;
16use risingwave_meta_model::catalog_version::VersionCategory;
17use risingwave_meta_model::prelude::CatalogVersion;
18use sea_orm::ActiveValue::Set;
19use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
20
21use crate::MetaResult;
22use crate::controller::SqlMetaStore;
23
24pub struct NotificationVersionGenerator {
25    current_version: u64,
26    conn: DatabaseConnection,
27}
28
29// TODO: add pre-allocation if necessary
30impl NotificationVersionGenerator {
31    pub async fn new(meta_store_impl: SqlMetaStore) -> MetaResult<Self> {
32        let txn = meta_store_impl.conn.begin().await?;
33        let model = CatalogVersion::find_by_id(VersionCategory::Notification)
34            .one(&txn)
35            .await?;
36        let current_version = model.as_ref().map(|m| m.version).unwrap_or(1) as u64;
37        if model.is_none() {
38            CatalogVersion::insert(catalog_version::ActiveModel {
39                name: Set(VersionCategory::Notification),
40                version: Set(1),
41            })
42            .exec(&txn)
43            .await?;
44            txn.commit().await?;
45        }
46
47        Ok(Self {
48            current_version,
49            conn: meta_store_impl.conn,
50        })
51    }
52
53    pub fn current_version(&self) -> u64 {
54        self.current_version
55    }
56
57    pub async fn increase_version(&mut self) {
58        catalog_version::ActiveModel {
59            name: Set(VersionCategory::Notification),
60            version: Set((self.current_version + 1) as i64),
61        }
62        .update(&self.conn)
63        .await
64        .unwrap();
65        self.current_version += 1;
66    }
67}