risingwave_meta/barrier/
manager.rs1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use anyhow::Context;
20use arc_swap::ArcSwap;
21use risingwave_common::bail;
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_meta_model::{CreateType, DatabaseId};
24use risingwave_pb::ddl_service::DdlProgress;
25use risingwave_pb::id::JobId;
26use risingwave_pb::meta::PbRecoveryStatus;
27use tokio::sync::mpsc::unbounded_channel;
28use tokio::sync::{mpsc, oneshot};
29use tokio::task::JoinHandle;
30use tracing::warn;
31
32use crate::MetaResult;
33use crate::barrier::cdc_progress::CdcProgress;
34use crate::barrier::worker::GlobalBarrierWorker;
35use crate::barrier::{BarrierManagerRequest, BarrierManagerStatus, RecoveryReason, schedule};
36use crate::hummock::HummockManagerRef;
37use crate::manager::sink_coordination::SinkCoordinatorManager;
38use crate::manager::{MetaSrvEnv, MetadataManager};
39use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
40
41pub struct GlobalBarrierManager {
42 status: Arc<ArcSwap<BarrierManagerStatus>>,
43 hummock_manager: HummockManagerRef,
44 request_tx: mpsc::UnboundedSender<BarrierManagerRequest>,
45 metadata_manager: MetadataManager,
46}
47
48pub type BarrierManagerRef = Arc<GlobalBarrierManager>;
49
50impl GlobalBarrierManager {
51 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
53 let mut ddl_progress = {
54 let (tx, rx) = oneshot::channel();
55 self.request_tx
56 .send(BarrierManagerRequest::GetDdlProgress(tx))
57 .context("failed to send get ddl progress request")?;
58 rx.await.context("failed to receive get ddl progress")?
59 };
60 let job_info = self
63 .metadata_manager
64 .catalog_controller
65 .list_background_creating_jobs(true, None)
66 .await?;
67 for (job_id, definition, _init_at) in job_info {
68 if let Entry::Vacant(e) = ddl_progress.entry(job_id) {
69 warn!(%job_id, "background job has no ddl progress");
70 e.insert(DdlProgress {
71 id: job_id.as_raw_id() as u64,
72 statement: definition,
73 create_type: CreateType::Background.as_str().into(),
74 progress: "0.0%".into(),
75 });
76 }
77 }
78
79 Ok(ddl_progress.into_values().collect())
80 }
81
82 pub async fn get_cdc_progress(&self) -> MetaResult<HashMap<JobId, CdcProgress>> {
83 let (tx, rx) = oneshot::channel();
84 self.request_tx
85 .send(BarrierManagerRequest::GetCdcProgress(tx))
86 .context("failed to send get ddl progress request")?;
87 Ok(rx.await.context("failed to receive get ddl progress")?)
88 }
89
90 pub async fn adhoc_recovery(&self) -> MetaResult<()> {
91 let (tx, rx) = oneshot::channel();
92 self.request_tx
93 .send(BarrierManagerRequest::AdhocRecovery(tx))
94 .context("failed to send adhoc recovery request")?;
95 rx.await.context("failed to wait adhoc recovery")?;
96 Ok(())
97 }
98
99 pub async fn update_database_barrier(
100 &self,
101 database_id: DatabaseId,
102 barrier_interval_ms: Option<u32>,
103 checkpoint_frequency: Option<u64>,
104 ) -> MetaResult<()> {
105 let (tx, rx) = oneshot::channel();
106 self.request_tx
107 .send(BarrierManagerRequest::UpdateDatabaseBarrier {
108 database_id,
109 barrier_interval_ms,
110 checkpoint_frequency,
111 sender: tx,
112 })
113 .context("failed to send update database barrier request")?;
114 rx.await.context("failed to wait update database barrier")?;
115 Ok(())
116 }
117
118 pub async fn get_hummock_version_id(&self) -> HummockVersionId {
119 self.hummock_manager.get_version_id().await
120 }
121}
122
123impl GlobalBarrierManager {
124 pub fn check_status_running(&self) -> MetaResult<()> {
126 let status = self.status.load();
127 match &**status {
128 BarrierManagerStatus::Starting
129 | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
130 bail!("The cluster is bootstrapping")
131 }
132 BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
133 Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
134 }
135 BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
136 bail!("The cluster is recovering-adhoc")
137 }
138 BarrierManagerStatus::Running => Ok(()),
139 }
140 }
141
142 pub fn get_recovery_status(&self) -> PbRecoveryStatus {
143 (&**self.status.load()).into()
144 }
145}
146
147impl GlobalBarrierManager {
148 pub async fn start(
149 scheduled_barriers: schedule::ScheduledBarriers,
150 env: MetaSrvEnv,
151 metadata_manager: MetadataManager,
152 hummock_manager: HummockManagerRef,
153 source_manager: SourceManagerRef,
154 sink_manager: SinkCoordinatorManager,
155 scale_controller: ScaleControllerRef,
156 barrier_scheduler: schedule::BarrierScheduler,
157 refresh_manager: GlobalRefreshManagerRef,
158 ) -> (Arc<Self>, JoinHandle<()>, oneshot::Sender<()>) {
159 let (request_tx, request_rx) = unbounded_channel();
160 let hummock_manager_clone = hummock_manager.clone();
161 let metadata_manager_clone = metadata_manager.clone();
162 let barrier_worker = GlobalBarrierWorker::new(
163 scheduled_barriers,
164 env,
165 metadata_manager,
166 hummock_manager,
167 source_manager,
168 sink_manager,
169 scale_controller,
170 request_rx,
171 barrier_scheduler,
172 refresh_manager,
173 )
174 .await;
175 let manager = Self {
176 status: barrier_worker.context.status(),
177 hummock_manager: hummock_manager_clone,
178 request_tx,
179 metadata_manager: metadata_manager_clone,
180 };
181 let (join_handle, shutdown_tx) = barrier_worker.start();
182 (Arc::new(manager), join_handle, shutdown_tx)
183 }
184}