risingwave_meta/barrier/
manager.rs1use std::collections::hash_map::Entry;
16use std::sync::Arc;
17
18use anyhow::Context;
19use arc_swap::ArcSwap;
20use risingwave_common::bail;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::mpsc::unbounded_channel;
25use tokio::sync::{mpsc, oneshot};
26use tokio::task::JoinHandle;
27use tracing::warn;
28
29use crate::MetaResult;
30use crate::barrier::worker::GlobalBarrierWorker;
31use crate::barrier::{BarrierManagerRequest, BarrierManagerStatus, RecoveryReason, schedule};
32use crate::hummock::HummockManagerRef;
33use crate::manager::sink_coordination::SinkCoordinatorManager;
34use crate::manager::{MetaSrvEnv, MetadataManager};
35use crate::stream::{ScaleControllerRef, SourceManagerRef};
36
37pub struct GlobalBarrierManager {
38 status: Arc<ArcSwap<BarrierManagerStatus>>,
39 hummock_manager: HummockManagerRef,
40 request_tx: mpsc::UnboundedSender<BarrierManagerRequest>,
41 metadata_manager: MetadataManager,
42}
43
44pub type BarrierManagerRef = Arc<GlobalBarrierManager>;
45
46impl GlobalBarrierManager {
47 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
49 let mut ddl_progress = {
50 let (tx, rx) = oneshot::channel();
51 self.request_tx
52 .send(BarrierManagerRequest::GetDdlProgress(tx))
53 .context("failed to send get ddl progress request")?;
54 rx.await.context("failed to receive get ddl progress")?
55 };
56 let mviews = self
59 .metadata_manager
60 .catalog_controller
61 .list_background_creating_mviews(true)
62 .await?;
63 for mview in mviews {
64 if let Entry::Vacant(e) = ddl_progress.entry(mview.table_id as _) {
65 warn!(
66 job_id = mview.table_id,
67 "background job has no ddl progress"
68 );
69 e.insert(DdlProgress {
70 id: mview.table_id as u64,
71 statement: mview.definition,
72 progress: "0.0%".into(),
73 });
74 }
75 }
76
77 Ok(ddl_progress.into_values().collect())
78 }
79
80 pub async fn adhoc_recovery(&self) -> MetaResult<()> {
81 let (tx, rx) = oneshot::channel();
82 self.request_tx
83 .send(BarrierManagerRequest::AdhocRecovery(tx))
84 .context("failed to send adhoc recovery request")?;
85 rx.await.context("failed to wait adhoc recovery")?;
86 Ok(())
87 }
88
89 pub async fn get_hummock_version_id(&self) -> HummockVersionId {
90 self.hummock_manager.get_version_id().await
91 }
92}
93
94impl GlobalBarrierManager {
95 pub fn check_status_running(&self) -> MetaResult<()> {
97 let status = self.status.load();
98 match &**status {
99 BarrierManagerStatus::Starting
100 | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
101 bail!("The cluster is bootstrapping")
102 }
103 BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
104 Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
105 }
106 BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
107 bail!("The cluster is recovering-adhoc")
108 }
109 BarrierManagerStatus::Running => Ok(()),
110 }
111 }
112
113 pub fn get_recovery_status(&self) -> PbRecoveryStatus {
114 (&**self.status.load()).into()
115 }
116}
117
118impl GlobalBarrierManager {
119 pub async fn start(
120 scheduled_barriers: schedule::ScheduledBarriers,
121 env: MetaSrvEnv,
122 metadata_manager: MetadataManager,
123 hummock_manager: HummockManagerRef,
124 source_manager: SourceManagerRef,
125 sink_manager: SinkCoordinatorManager,
126 scale_controller: ScaleControllerRef,
127 ) -> (Arc<Self>, JoinHandle<()>, oneshot::Sender<()>) {
128 let (request_tx, request_rx) = unbounded_channel();
129 let hummock_manager_clone = hummock_manager.clone();
130 let metadata_manager_clone = metadata_manager.clone();
131 let barrier_worker = GlobalBarrierWorker::new(
132 scheduled_barriers,
133 env,
134 metadata_manager,
135 hummock_manager,
136 source_manager,
137 sink_manager,
138 scale_controller,
139 request_rx,
140 )
141 .await;
142 let manager = Self {
143 status: barrier_worker.context.status(),
144 hummock_manager: hummock_manager_clone,
145 request_tx,
146 metadata_manager: metadata_manager_clone,
147 };
148 let (join_handle, shutdown_tx) = barrier_worker.start();
149 (Arc::new(manager), join_handle, shutdown_tx)
150 }
151}