1use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use arc_swap::ArcSwap;
20use risingwave_common::bail;
21use risingwave_common::cast::datetime_to_timestamp_millis;
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_meta_model::DatabaseId;
24use risingwave_pb::ddl_service::{DdlProgress, PbBackfillType};
25use risingwave_pb::id::JobId;
26use risingwave_pb::meta::PbRecoveryStatus;
27use thiserror_ext::AsReport;
28use tokio::sync::mpsc::unbounded_channel;
29use tokio::sync::{mpsc, oneshot};
30use tokio::task::JoinHandle;
31use tracing::warn;
32
33use crate::MetaResult;
34use crate::barrier::BarrierManagerRequest::MayHaveSnapshotBackfillingJob;
35use crate::barrier::cdc_progress::CdcProgress;
36use crate::barrier::worker::GlobalBarrierWorker;
37use crate::barrier::{
38 BackfillProgress, BarrierManagerRequest, BarrierManagerStatus, FragmentBackfillProgress,
39 RecoveryReason, UpdateDatabaseBarrierRequest, schedule,
40};
41use crate::hummock::HummockManagerRef;
42use crate::manager::sink_coordination::SinkCoordinatorManager;
43use crate::manager::{MetaSrvEnv, MetadataManager};
44use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
45
46pub struct GlobalBarrierManager {
47 status: Arc<ArcSwap<BarrierManagerStatus>>,
48 hummock_manager: HummockManagerRef,
49 request_tx: mpsc::UnboundedSender<BarrierManagerRequest>,
50 metadata_manager: MetadataManager,
51}
52
53pub type BarrierManagerRef = Arc<GlobalBarrierManager>;
54
55impl GlobalBarrierManager {
56 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
58 let mut backfill_progress = {
59 let (tx, rx) = oneshot::channel();
60 self.request_tx
61 .send(BarrierManagerRequest::GetBackfillProgress(tx))
62 .context("failed to send get ddl progress request")?;
63 rx.await.context("failed to receive get ddl progress")?
64 };
65 let job_info = self
68 .metadata_manager
69 .catalog_controller
70 .list_creating_jobs(true, true, None)
71 .await?;
72 Ok(job_info
73 .into_iter()
74 .map(
75 |(job_id, definition, init_at, create_type, is_serverless_backfill)| {
76 let BackfillProgress {
77 progress,
78 backfill_type,
79 } = match &mut backfill_progress {
80 Ok(progress) => progress.remove(&job_id).unwrap_or_else(|| {
81 warn!(%job_id, "background job has no ddl progress");
82 BackfillProgress {
83 progress: "0.0%".into(),
84 backfill_type: PbBackfillType::NormalBackfill,
85 }
86 }),
87 Err(e) => BackfillProgress {
88 progress: format!("Err[{}]", e.as_report()),
89 backfill_type: PbBackfillType::NormalBackfill,
90 },
91 };
92 DdlProgress {
93 id: job_id.as_raw_id() as u64,
94 statement: definition,
95 create_type: create_type.as_str().into(),
96 initialized_at_time_millis: datetime_to_timestamp_millis(init_at),
97 progress,
98 is_serverless_backfill,
99 backfill_type: backfill_type as _,
100 }
101 },
102 )
103 .collect())
104 }
105
106 pub(crate) async fn get_fragment_backfill_progress(
107 &self,
108 ) -> MetaResult<Vec<FragmentBackfillProgress>> {
109 let (tx, rx) = oneshot::channel();
110 self.request_tx
111 .send(BarrierManagerRequest::GetFragmentBackfillProgress(tx))
112 .context("failed to send get fragment backfill progress request")?;
113 rx.await
114 .context("failed to receive get fragment backfill progress")?
115 }
116
117 pub async fn get_cdc_progress(&self) -> MetaResult<HashMap<JobId, CdcProgress>> {
118 let (tx, rx) = oneshot::channel();
119 self.request_tx
120 .send(BarrierManagerRequest::GetCdcProgress(tx))
121 .context("failed to send get ddl progress request")?;
122 rx.await.context("failed to receive get ddl progress")?
123 }
124
125 pub async fn adhoc_recovery(&self) -> MetaResult<()> {
126 let (tx, rx) = oneshot::channel();
127 self.request_tx
128 .send(BarrierManagerRequest::AdhocRecovery(tx))
129 .context("failed to send adhoc recovery request")?;
130 rx.await.context("failed to wait adhoc recovery")?;
131 Ok(())
132 }
133
134 pub async fn update_database_barrier(
135 &self,
136 database_id: DatabaseId,
137 barrier_interval_ms: Option<u32>,
138 checkpoint_frequency: Option<u64>,
139 ) -> MetaResult<()> {
140 let (tx, rx) = oneshot::channel();
141 self.request_tx
142 .send(BarrierManagerRequest::UpdateDatabaseBarrier(
143 UpdateDatabaseBarrierRequest {
144 database_id,
145 barrier_interval_ms,
146 checkpoint_frequency,
147 sender: tx,
148 },
149 ))
150 .context("failed to send update database barrier request")?;
151 rx.await.context("failed to wait update database barrier")?;
152 Ok(())
153 }
154
155 pub async fn may_snapshot_backfilling_job(&self) -> MetaResult<bool> {
156 let (tx, rx) = oneshot::channel();
157 self.request_tx
158 .send(MayHaveSnapshotBackfillingJob(tx))
159 .context("failed to send has snapshot backfilling job request")?;
160 Ok(rx
161 .await
162 .context("failed to wait has snapshot backfilling job")?)
163 }
164
165 pub async fn get_hummock_version_id(&self) -> HummockVersionId {
166 self.hummock_manager.get_version_id().await
167 }
168}
169
170impl GlobalBarrierManager {
171 pub fn check_status_running(&self) -> MetaResult<()> {
173 let status = self.status.load();
174 match &**status {
175 BarrierManagerStatus::Starting
176 | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
177 bail!("The cluster is bootstrapping")
178 }
179 BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
180 Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
181 }
182 BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
183 bail!("The cluster is recovering-adhoc")
184 }
185 BarrierManagerStatus::Running => Ok(()),
186 }
187 }
188
189 pub fn get_recovery_status(&self) -> PbRecoveryStatus {
190 (&**self.status.load()).into()
191 }
192}
193
194impl GlobalBarrierManager {
195 pub async fn start(
196 scheduled_barriers: schedule::ScheduledBarriers,
197 env: MetaSrvEnv,
198 metadata_manager: MetadataManager,
199 hummock_manager: HummockManagerRef,
200 source_manager: SourceManagerRef,
201 sink_manager: SinkCoordinatorManager,
202 scale_controller: ScaleControllerRef,
203 barrier_scheduler: schedule::BarrierScheduler,
204 refresh_manager: GlobalRefreshManagerRef,
205 ) -> (Arc<Self>, JoinHandle<()>, oneshot::Sender<()>) {
206 let (request_tx, request_rx) = unbounded_channel();
207 let hummock_manager_clone = hummock_manager.clone();
208 let metadata_manager_clone = metadata_manager.clone();
209 let barrier_worker = GlobalBarrierWorker::new(
210 scheduled_barriers,
211 env,
212 metadata_manager,
213 hummock_manager,
214 source_manager,
215 sink_manager,
216 scale_controller,
217 request_rx,
218 barrier_scheduler,
219 refresh_manager,
220 )
221 .await;
222 let manager = Self {
223 status: barrier_worker.context.status(),
224 hummock_manager: hummock_manager_clone,
225 request_tx,
226 metadata_manager: metadata_manager_clone,
227 };
228 let (join_handle, shutdown_tx) = barrier_worker.start();
229 (Arc::new(manager), join_handle, shutdown_tx)
230 }
231}