risingwave_meta/manager/
notification_version.rsuse risingwave_meta_model::catalog_version;
use risingwave_meta_model::catalog_version::VersionCategory;
use risingwave_meta_model::prelude::CatalogVersion;
use sea_orm::ActiveValue::Set;
use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
use crate::controller::SqlMetaStore;
use crate::MetaResult;
pub struct NotificationVersionGenerator {
current_version: u64,
conn: DatabaseConnection,
}
impl NotificationVersionGenerator {
pub async fn new(meta_store_impl: SqlMetaStore) -> MetaResult<Self> {
let txn = meta_store_impl.conn.begin().await?;
let model = CatalogVersion::find_by_id(VersionCategory::Notification)
.one(&txn)
.await?;
let current_version = model.as_ref().map(|m| m.version).unwrap_or(1) as u64;
if model.is_none() {
CatalogVersion::insert(catalog_version::ActiveModel {
name: Set(VersionCategory::Notification),
version: Set(1),
})
.exec(&txn)
.await?;
txn.commit().await?;
}
Ok(Self {
current_version,
conn: meta_store_impl.conn,
})
}
pub fn current_version(&self) -> u64 {
self.current_version
}
pub async fn increase_version(&mut self) {
catalog_version::ActiveModel {
name: Set(VersionCategory::Notification),
version: Set((self.current_version + 1) as i64),
}
.update(&self.conn)
.await
.unwrap();
self.current_version += 1;
}
}