risingwave_meta/barrier/
manager.rsuse std::collections::hash_map::Entry;
use std::sync::Arc;
use anyhow::Context;
use arc_swap::ArcSwap;
use risingwave_common::bail;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::meta::PbRecoveryStatus;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use crate::barrier::worker::GlobalBarrierWorker;
use crate::barrier::{schedule, BarrierManagerRequest, BarrierManagerStatus, RecoveryReason};
use crate::hummock::HummockManagerRef;
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{MetaSrvEnv, MetadataManager};
use crate::stream::{ScaleControllerRef, SourceManagerRef};
use crate::MetaResult;
pub struct GlobalBarrierManager {
status: Arc<ArcSwap<BarrierManagerStatus>>,
hummock_manager: HummockManagerRef,
request_tx: mpsc::UnboundedSender<BarrierManagerRequest>,
metadata_manager: MetadataManager,
}
pub type BarrierManagerRef = Arc<GlobalBarrierManager>;
impl GlobalBarrierManager {
pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
let mut ddl_progress = {
let (tx, rx) = oneshot::channel();
self.request_tx
.send(BarrierManagerRequest::GetDdlProgress(tx))
.context("failed to send get ddl progress request")?;
rx.await.context("failed to receive get ddl progress")?
};
let mviews = self
.metadata_manager
.catalog_controller
.list_background_creating_mviews(true)
.await
.unwrap();
for mview in mviews {
if let Entry::Vacant(e) = ddl_progress.entry(mview.table_id as _) {
e.insert(DdlProgress {
id: mview.table_id as u64,
statement: mview.definition,
progress: "0.0%".into(),
});
}
}
Ok(ddl_progress.into_values().collect())
}
pub async fn get_hummock_version_id(&self) -> HummockVersionId {
self.hummock_manager.get_version_id().await
}
}
impl GlobalBarrierManager {
pub fn check_status_running(&self) -> MetaResult<()> {
let status = self.status.load();
match &**status {
BarrierManagerStatus::Starting
| BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
bail!("The cluster is bootstrapping")
}
BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
}
BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
bail!("The cluster is recovering-adhoc")
}
BarrierManagerStatus::Running => Ok(()),
}
}
pub fn get_recovery_status(&self) -> PbRecoveryStatus {
(&**self.status.load()).into()
}
}
impl GlobalBarrierManager {
pub async fn start(
scheduled_barriers: schedule::ScheduledBarriers,
env: MetaSrvEnv,
metadata_manager: MetadataManager,
hummock_manager: HummockManagerRef,
source_manager: SourceManagerRef,
sink_manager: SinkCoordinatorManager,
scale_controller: ScaleControllerRef,
) -> (Arc<Self>, JoinHandle<()>, oneshot::Sender<()>) {
let (request_tx, request_rx) = unbounded_channel();
let hummock_manager_clone = hummock_manager.clone();
let metadata_manager_clone = metadata_manager.clone();
let barrier_worker = GlobalBarrierWorker::new(
scheduled_barriers,
env,
metadata_manager,
hummock_manager,
source_manager,
sink_manager,
scale_controller,
request_rx,
)
.await;
let manager = Self {
status: barrier_worker.context.status(),
hummock_manager: hummock_manager_clone,
request_tx,
metadata_manager: metadata_manager_clone,
};
let (join_handle, shutdown_tx) = barrier_worker.start();
(Arc::new(manager), join_handle, shutdown_tx)
}
}