1use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use arc_swap::ArcSwap;
20use risingwave_common::bail;
21use risingwave_common::cast::datetime_to_timestamp_millis;
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_meta_model::DatabaseId;
24use risingwave_pb::ddl_service::{DdlProgress, PbBackfillType};
25use risingwave_pb::id::JobId;
26use risingwave_pb::meta::PbRecoveryStatus;
27use thiserror_ext::AsReport;
28use tokio::sync::mpsc::unbounded_channel;
29use tokio::sync::{mpsc, oneshot};
30use tokio::task::JoinHandle;
31use tracing::warn;
32
33use crate::MetaResult;
34use crate::barrier::BarrierManagerRequest::MayHaveSnapshotBackfillingJob;
35use crate::barrier::cdc_progress::CdcProgress;
36use crate::barrier::worker::GlobalBarrierWorker;
37use crate::barrier::{
38 BackfillProgress, BarrierManagerRequest, BarrierManagerStatus, FragmentBackfillProgress,
39 RecoveryReason, UpdateDatabaseBarrierRequest, schedule,
40};
41use crate::hummock::HummockManagerRef;
42use crate::manager::iceberg_v3_sink::IcebergV3SinkManager;
43use crate::manager::sink_coordination::SinkCoordinatorManager;
44use crate::manager::{MetaSrvEnv, MetadataManager};
45use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
46
47pub struct GlobalBarrierManager {
48 status: Arc<ArcSwap<BarrierManagerStatus>>,
49 hummock_manager: HummockManagerRef,
50 request_tx: mpsc::UnboundedSender<BarrierManagerRequest>,
51 metadata_manager: MetadataManager,
52}
53
54pub type BarrierManagerRef = Arc<GlobalBarrierManager>;
55
56impl GlobalBarrierManager {
57 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
59 let mut backfill_progress = {
60 let (tx, rx) = oneshot::channel();
61 self.request_tx
62 .send(BarrierManagerRequest::GetBackfillProgress(tx))
63 .context("failed to send get ddl progress request")?;
64 rx.await.context("failed to receive get ddl progress")?
65 };
66 let job_info = self
69 .metadata_manager
70 .catalog_controller
71 .list_creating_jobs(true, true, None)
72 .await?;
73 Ok(job_info
74 .into_iter()
75 .map(
76 |(job_id, definition, init_at, create_type, is_serverless_backfill)| {
77 let BackfillProgress {
78 progress,
79 backfill_type,
80 } = match &mut backfill_progress {
81 Ok(progress) => progress.remove(&job_id).unwrap_or_else(|| {
82 warn!(%job_id, "background job has no ddl progress");
83 BackfillProgress {
84 progress: "0.0%".into(),
85 backfill_type: PbBackfillType::NormalBackfill,
86 }
87 }),
88 Err(e) => BackfillProgress {
89 progress: format!("Err[{}]", e.as_report()),
90 backfill_type: PbBackfillType::NormalBackfill,
91 },
92 };
93 DdlProgress {
94 id: job_id.as_raw_id() as u64,
95 statement: definition,
96 create_type: create_type.as_str().into(),
97 initialized_at_time_millis: datetime_to_timestamp_millis(init_at),
98 progress,
99 is_serverless_backfill,
100 backfill_type: backfill_type as _,
101 }
102 },
103 )
104 .collect())
105 }
106
107 pub(crate) async fn get_fragment_backfill_progress(
108 &self,
109 ) -> MetaResult<Vec<FragmentBackfillProgress>> {
110 let (tx, rx) = oneshot::channel();
111 self.request_tx
112 .send(BarrierManagerRequest::GetFragmentBackfillProgress(tx))
113 .context("failed to send get fragment backfill progress request")?;
114 rx.await
115 .context("failed to receive get fragment backfill progress")?
116 }
117
118 pub async fn get_cdc_progress(&self) -> MetaResult<HashMap<JobId, CdcProgress>> {
119 let (tx, rx) = oneshot::channel();
120 self.request_tx
121 .send(BarrierManagerRequest::GetCdcProgress(tx))
122 .context("failed to send get ddl progress request")?;
123 rx.await.context("failed to receive get ddl progress")?
124 }
125
126 pub async fn adhoc_recovery(&self) -> MetaResult<()> {
127 let (tx, rx) = oneshot::channel();
128 self.request_tx
129 .send(BarrierManagerRequest::AdhocRecovery(tx))
130 .context("failed to send adhoc recovery request")?;
131 rx.await.context("failed to wait adhoc recovery")?;
132 Ok(())
133 }
134
135 pub async fn update_database_barrier(
136 &self,
137 database_id: DatabaseId,
138 barrier_interval_ms: Option<u32>,
139 checkpoint_frequency: Option<u64>,
140 ) -> MetaResult<()> {
141 let (tx, rx) = oneshot::channel();
142 self.request_tx
143 .send(BarrierManagerRequest::UpdateDatabaseBarrier(
144 UpdateDatabaseBarrierRequest {
145 database_id,
146 barrier_interval_ms,
147 checkpoint_frequency,
148 sender: tx,
149 },
150 ))
151 .context("failed to send update database barrier request")?;
152 rx.await.context("failed to wait update database barrier")?;
153 Ok(())
154 }
155
156 pub async fn may_snapshot_backfilling_job(&self) -> MetaResult<bool> {
157 let (tx, rx) = oneshot::channel();
158 self.request_tx
159 .send(MayHaveSnapshotBackfillingJob(tx))
160 .context("failed to send has snapshot backfilling job request")?;
161 Ok(rx
162 .await
163 .context("failed to wait has snapshot backfilling job")?)
164 }
165
166 pub async fn get_hummock_version_id(&self) -> HummockVersionId {
167 self.hummock_manager.get_version_id().await
168 }
169}
170
171impl GlobalBarrierManager {
172 pub fn check_status_running(&self) -> MetaResult<()> {
174 let status = self.status.load();
175 match &**status {
176 BarrierManagerStatus::Starting
177 | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
178 bail!("The cluster is bootstrapping")
179 }
180 BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
181 Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
182 }
183 BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
184 bail!("The cluster is recovering-adhoc")
185 }
186 BarrierManagerStatus::Running => Ok(()),
187 }
188 }
189
190 pub fn get_recovery_status(&self) -> PbRecoveryStatus {
191 (&**self.status.load()).into()
192 }
193}
194
195impl GlobalBarrierManager {
196 pub async fn start(
197 scheduled_barriers: schedule::ScheduledBarriers,
198 env: MetaSrvEnv,
199 metadata_manager: MetadataManager,
200 hummock_manager: HummockManagerRef,
201 source_manager: SourceManagerRef,
202 sink_manager: SinkCoordinatorManager,
203 iceberg_v3_sink_manager: IcebergV3SinkManager,
204 scale_controller: ScaleControllerRef,
205 barrier_scheduler: schedule::BarrierScheduler,
206 refresh_manager: GlobalRefreshManagerRef,
207 ) -> (Arc<Self>, JoinHandle<()>, oneshot::Sender<()>) {
208 let (request_tx, request_rx) = unbounded_channel();
209 let hummock_manager_clone = hummock_manager.clone();
210 let metadata_manager_clone = metadata_manager.clone();
211 let barrier_worker = GlobalBarrierWorker::new(
212 scheduled_barriers,
213 env,
214 metadata_manager,
215 hummock_manager,
216 source_manager,
217 sink_manager,
218 iceberg_v3_sink_manager,
219 scale_controller,
220 request_rx,
221 barrier_scheduler,
222 refresh_manager,
223 )
224 .await;
225 let manager = Self {
226 status: barrier_worker.context.status(),
227 hummock_manager: hummock_manager_clone,
228 request_tx,
229 metadata_manager: metadata_manager_clone,
230 };
231 let (join_handle, shutdown_tx) = barrier_worker.start();
232 (Arc::new(manager), join_handle, shutdown_tx)
233 }
234}