risingwave_meta/manager/
notification_version.rs1use risingwave_meta_model::catalog_version;
16use risingwave_meta_model::catalog_version::VersionCategory;
17use risingwave_meta_model::prelude::CatalogVersion;
18use sea_orm::ActiveValue::Set;
19use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
20
21use crate::MetaResult;
22use crate::controller::SqlMetaStore;
23
24pub struct NotificationVersionGenerator {
25 current_version: u64,
26 conn: DatabaseConnection,
27}
28
29impl NotificationVersionGenerator {
31 pub async fn new(meta_store_impl: SqlMetaStore) -> MetaResult<Self> {
32 let txn = meta_store_impl.conn.begin().await?;
33 let model = CatalogVersion::find_by_id(VersionCategory::Notification)
34 .one(&txn)
35 .await?;
36 let current_version = model.as_ref().map(|m| m.version).unwrap_or(1) as u64;
37 if model.is_none() {
38 CatalogVersion::insert(catalog_version::ActiveModel {
39 name: Set(VersionCategory::Notification),
40 version: Set(1),
41 })
42 .exec(&txn)
43 .await?;
44 txn.commit().await?;
45 }
46
47 Ok(Self {
48 current_version,
49 conn: meta_store_impl.conn,
50 })
51 }
52
53 pub fn current_version(&self) -> u64 {
54 self.current_version
55 }
56
57 pub async fn increase_version(&mut self) {
58 catalog_version::ActiveModel {
59 name: Set(VersionCategory::Notification),
60 version: Set((self.current_version + 1) as i64),
61 }
62 .update(&self.conn)
63 .await
64 .unwrap();
65 self.current_version += 1;
66 }
67}