Skip to main content

risingwave_meta/barrier/
manager.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use arc_swap::ArcSwap;
20use risingwave_common::bail;
21use risingwave_common::cast::datetime_to_timestamp_millis;
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_meta_model::DatabaseId;
24use risingwave_pb::ddl_service::{DdlProgress, PbBackfillType};
25use risingwave_pb::id::JobId;
26use risingwave_pb::meta::PbRecoveryStatus;
27use thiserror_ext::AsReport;
28use tokio::sync::mpsc::unbounded_channel;
29use tokio::sync::{mpsc, oneshot};
30use tokio::task::JoinHandle;
31use tracing::warn;
32
33use crate::MetaResult;
34use crate::barrier::BarrierManagerRequest::MayHaveSnapshotBackfillingJob;
35use crate::barrier::cdc_progress::CdcProgress;
36use crate::barrier::worker::GlobalBarrierWorker;
37use crate::barrier::{
38    BackfillProgress, BarrierManagerRequest, BarrierManagerStatus, FragmentBackfillProgress,
39    RecoveryReason, UpdateDatabaseBarrierRequest, schedule,
40};
41use crate::hummock::HummockManagerRef;
42use crate::manager::iceberg_v3_sink::IcebergV3SinkManager;
43use crate::manager::sink_coordination::SinkCoordinatorManager;
44use crate::manager::{MetaSrvEnv, MetadataManager};
45use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
46
47pub struct GlobalBarrierManager {
48    status: Arc<ArcSwap<BarrierManagerStatus>>,
49    hummock_manager: HummockManagerRef,
50    request_tx: mpsc::UnboundedSender<BarrierManagerRequest>,
51    metadata_manager: MetadataManager,
52}
53
54pub type BarrierManagerRef = Arc<GlobalBarrierManager>;
55
56impl GlobalBarrierManager {
57    /// Serving `SHOW JOBS / SELECT * FROM rw_ddl_progress`
58    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
59        let mut backfill_progress = {
60            let (tx, rx) = oneshot::channel();
61            self.request_tx
62                .send(BarrierManagerRequest::GetBackfillProgress(tx))
63                .context("failed to send get ddl progress request")?;
64            rx.await.context("failed to receive get ddl progress")?
65        };
66        // If not in tracker, means the first barrier not collected yet.
67        // In that case just return progress 0.
68        let job_info = self
69            .metadata_manager
70            .catalog_controller
71            .list_creating_jobs(true, true, None)
72            .await?;
73        Ok(job_info
74            .into_iter()
75            .map(
76                |(job_id, definition, init_at, create_type, is_serverless_backfill)| {
77                    let BackfillProgress {
78                        progress,
79                        backfill_type,
80                    } = match &mut backfill_progress {
81                        Ok(progress) => progress.remove(&job_id).unwrap_or_else(|| {
82                            warn!(%job_id, "background job has no ddl progress");
83                            BackfillProgress {
84                                progress: "0.0%".into(),
85                                backfill_type: PbBackfillType::NormalBackfill,
86                            }
87                        }),
88                        Err(e) => BackfillProgress {
89                            progress: format!("Err[{}]", e.as_report()),
90                            backfill_type: PbBackfillType::NormalBackfill,
91                        },
92                    };
93                    DdlProgress {
94                        id: job_id.as_raw_id() as u64,
95                        statement: definition,
96                        create_type: create_type.as_str().into(),
97                        initialized_at_time_millis: datetime_to_timestamp_millis(init_at),
98                        progress,
99                        is_serverless_backfill,
100                        backfill_type: backfill_type as _,
101                    }
102                },
103            )
104            .collect())
105    }
106
107    pub(crate) async fn get_fragment_backfill_progress(
108        &self,
109    ) -> MetaResult<Vec<FragmentBackfillProgress>> {
110        let (tx, rx) = oneshot::channel();
111        self.request_tx
112            .send(BarrierManagerRequest::GetFragmentBackfillProgress(tx))
113            .context("failed to send get fragment backfill progress request")?;
114        rx.await
115            .context("failed to receive get fragment backfill progress")?
116    }
117
118    pub async fn get_cdc_progress(&self) -> MetaResult<HashMap<JobId, CdcProgress>> {
119        let (tx, rx) = oneshot::channel();
120        self.request_tx
121            .send(BarrierManagerRequest::GetCdcProgress(tx))
122            .context("failed to send get ddl progress request")?;
123        rx.await.context("failed to receive get ddl progress")?
124    }
125
126    pub async fn adhoc_recovery(&self) -> MetaResult<()> {
127        let (tx, rx) = oneshot::channel();
128        self.request_tx
129            .send(BarrierManagerRequest::AdhocRecovery(tx))
130            .context("failed to send adhoc recovery request")?;
131        rx.await.context("failed to wait adhoc recovery")?;
132        Ok(())
133    }
134
135    pub async fn update_database_barrier(
136        &self,
137        database_id: DatabaseId,
138        barrier_interval_ms: Option<u32>,
139        checkpoint_frequency: Option<u64>,
140    ) -> MetaResult<()> {
141        let (tx, rx) = oneshot::channel();
142        self.request_tx
143            .send(BarrierManagerRequest::UpdateDatabaseBarrier(
144                UpdateDatabaseBarrierRequest {
145                    database_id,
146                    barrier_interval_ms,
147                    checkpoint_frequency,
148                    sender: tx,
149                },
150            ))
151            .context("failed to send update database barrier request")?;
152        rx.await.context("failed to wait update database barrier")?;
153        Ok(())
154    }
155
156    pub async fn may_snapshot_backfilling_job(&self) -> MetaResult<bool> {
157        let (tx, rx) = oneshot::channel();
158        self.request_tx
159            .send(MayHaveSnapshotBackfillingJob(tx))
160            .context("failed to send has snapshot backfilling job request")?;
161        Ok(rx
162            .await
163            .context("failed to wait has snapshot backfilling job")?)
164    }
165
166    pub async fn get_hummock_version_id(&self) -> HummockVersionId {
167        self.hummock_manager.get_version_id().await
168    }
169}
170
171impl GlobalBarrierManager {
172    /// Check the status of barrier manager, return error if it is not `Running`.
173    pub fn check_status_running(&self) -> MetaResult<()> {
174        let status = self.status.load();
175        match &**status {
176            BarrierManagerStatus::Starting
177            | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
178                bail!("The cluster is bootstrapping")
179            }
180            BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
181                Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
182            }
183            BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
184                bail!("The cluster is recovering-adhoc")
185            }
186            BarrierManagerStatus::Running => Ok(()),
187        }
188    }
189
190    pub fn get_recovery_status(&self) -> PbRecoveryStatus {
191        (&**self.status.load()).into()
192    }
193}
194
195impl GlobalBarrierManager {
196    pub async fn start(
197        scheduled_barriers: schedule::ScheduledBarriers,
198        env: MetaSrvEnv,
199        metadata_manager: MetadataManager,
200        hummock_manager: HummockManagerRef,
201        source_manager: SourceManagerRef,
202        sink_manager: SinkCoordinatorManager,
203        iceberg_v3_sink_manager: IcebergV3SinkManager,
204        scale_controller: ScaleControllerRef,
205        barrier_scheduler: schedule::BarrierScheduler,
206        refresh_manager: GlobalRefreshManagerRef,
207    ) -> (Arc<Self>, JoinHandle<()>, oneshot::Sender<()>) {
208        let (request_tx, request_rx) = unbounded_channel();
209        let hummock_manager_clone = hummock_manager.clone();
210        let metadata_manager_clone = metadata_manager.clone();
211        let barrier_worker = GlobalBarrierWorker::new(
212            scheduled_barriers,
213            env,
214            metadata_manager,
215            hummock_manager,
216            source_manager,
217            sink_manager,
218            iceberg_v3_sink_manager,
219            scale_controller,
220            request_rx,
221            barrier_scheduler,
222            refresh_manager,
223        )
224        .await;
225        let manager = Self {
226            status: barrier_worker.context.status(),
227            hummock_manager: hummock_manager_clone,
228            request_tx,
229            metadata_manager: metadata_manager_clone,
230        };
231        let (join_handle, shutdown_tx) = barrier_worker.start();
232        (Arc::new(manager), join_handle, shutdown_tx)
233    }
234}