risingwave_meta/barrier/
manager.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use arc_swap::ArcSwap;
20use risingwave_common::bail;
21use risingwave_common::cast::datetime_to_timestamp_millis;
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_meta_model::DatabaseId;
24use risingwave_pb::ddl_service::{DdlProgress, PbBackfillType};
25use risingwave_pb::id::JobId;
26use risingwave_pb::meta::PbRecoveryStatus;
27use thiserror_ext::AsReport;
28use tokio::sync::mpsc::unbounded_channel;
29use tokio::sync::{mpsc, oneshot};
30use tokio::task::JoinHandle;
31use tracing::warn;
32
33use crate::MetaResult;
34use crate::barrier::BarrierManagerRequest::MayHaveSnapshotBackfillingJob;
35use crate::barrier::cdc_progress::CdcProgress;
36use crate::barrier::worker::GlobalBarrierWorker;
37use crate::barrier::{
38    BackfillProgress, BarrierManagerRequest, BarrierManagerStatus, FragmentBackfillProgress,
39    RecoveryReason, UpdateDatabaseBarrierRequest, schedule,
40};
41use crate::hummock::HummockManagerRef;
42use crate::manager::sink_coordination::SinkCoordinatorManager;
43use crate::manager::{MetaSrvEnv, MetadataManager};
44use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
45
46pub struct GlobalBarrierManager {
47    status: Arc<ArcSwap<BarrierManagerStatus>>,
48    hummock_manager: HummockManagerRef,
49    request_tx: mpsc::UnboundedSender<BarrierManagerRequest>,
50    metadata_manager: MetadataManager,
51}
52
53pub type BarrierManagerRef = Arc<GlobalBarrierManager>;
54
55impl GlobalBarrierManager {
56    /// Serving `SHOW JOBS / SELECT * FROM rw_ddl_progress`
57    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
58        let mut backfill_progress = {
59            let (tx, rx) = oneshot::channel();
60            self.request_tx
61                .send(BarrierManagerRequest::GetBackfillProgress(tx))
62                .context("failed to send get ddl progress request")?;
63            rx.await.context("failed to receive get ddl progress")?
64        };
65        // If not in tracker, means the first barrier not collected yet.
66        // In that case just return progress 0.
67        let job_info = self
68            .metadata_manager
69            .catalog_controller
70            .list_creating_jobs(true, true, None)
71            .await?;
72        Ok(job_info
73            .into_iter()
74            .map(
75                |(job_id, definition, init_at, create_type, is_serverless_backfill)| {
76                    let BackfillProgress {
77                        progress,
78                        backfill_type,
79                    } = match &mut backfill_progress {
80                        Ok(progress) => progress.remove(&job_id).unwrap_or_else(|| {
81                            warn!(%job_id, "background job has no ddl progress");
82                            BackfillProgress {
83                                progress: "0.0%".into(),
84                                backfill_type: PbBackfillType::NormalBackfill,
85                            }
86                        }),
87                        Err(e) => BackfillProgress {
88                            progress: format!("Err[{}]", e.as_report()),
89                            backfill_type: PbBackfillType::NormalBackfill,
90                        },
91                    };
92                    DdlProgress {
93                        id: job_id.as_raw_id() as u64,
94                        statement: definition,
95                        create_type: create_type.as_str().into(),
96                        initialized_at_time_millis: datetime_to_timestamp_millis(init_at),
97                        progress,
98                        is_serverless_backfill,
99                        backfill_type: backfill_type as _,
100                    }
101                },
102            )
103            .collect())
104    }
105
106    pub(crate) async fn get_fragment_backfill_progress(
107        &self,
108    ) -> MetaResult<Vec<FragmentBackfillProgress>> {
109        let (tx, rx) = oneshot::channel();
110        self.request_tx
111            .send(BarrierManagerRequest::GetFragmentBackfillProgress(tx))
112            .context("failed to send get fragment backfill progress request")?;
113        rx.await
114            .context("failed to receive get fragment backfill progress")?
115    }
116
117    pub async fn get_cdc_progress(&self) -> MetaResult<HashMap<JobId, CdcProgress>> {
118        let (tx, rx) = oneshot::channel();
119        self.request_tx
120            .send(BarrierManagerRequest::GetCdcProgress(tx))
121            .context("failed to send get ddl progress request")?;
122        rx.await.context("failed to receive get ddl progress")?
123    }
124
125    pub async fn adhoc_recovery(&self) -> MetaResult<()> {
126        let (tx, rx) = oneshot::channel();
127        self.request_tx
128            .send(BarrierManagerRequest::AdhocRecovery(tx))
129            .context("failed to send adhoc recovery request")?;
130        rx.await.context("failed to wait adhoc recovery")?;
131        Ok(())
132    }
133
134    pub async fn update_database_barrier(
135        &self,
136        database_id: DatabaseId,
137        barrier_interval_ms: Option<u32>,
138        checkpoint_frequency: Option<u64>,
139    ) -> MetaResult<()> {
140        let (tx, rx) = oneshot::channel();
141        self.request_tx
142            .send(BarrierManagerRequest::UpdateDatabaseBarrier(
143                UpdateDatabaseBarrierRequest {
144                    database_id,
145                    barrier_interval_ms,
146                    checkpoint_frequency,
147                    sender: tx,
148                },
149            ))
150            .context("failed to send update database barrier request")?;
151        rx.await.context("failed to wait update database barrier")?;
152        Ok(())
153    }
154
155    pub async fn may_snapshot_backfilling_job(&self) -> MetaResult<bool> {
156        let (tx, rx) = oneshot::channel();
157        self.request_tx
158            .send(MayHaveSnapshotBackfillingJob(tx))
159            .context("failed to send has snapshot backfilling job request")?;
160        Ok(rx
161            .await
162            .context("failed to wait has snapshot backfilling job")?)
163    }
164
165    pub async fn get_hummock_version_id(&self) -> HummockVersionId {
166        self.hummock_manager.get_version_id().await
167    }
168}
169
170impl GlobalBarrierManager {
171    /// Check the status of barrier manager, return error if it is not `Running`.
172    pub fn check_status_running(&self) -> MetaResult<()> {
173        let status = self.status.load();
174        match &**status {
175            BarrierManagerStatus::Starting
176            | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
177                bail!("The cluster is bootstrapping")
178            }
179            BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
180                Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
181            }
182            BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
183                bail!("The cluster is recovering-adhoc")
184            }
185            BarrierManagerStatus::Running => Ok(()),
186        }
187    }
188
189    pub fn get_recovery_status(&self) -> PbRecoveryStatus {
190        (&**self.status.load()).into()
191    }
192}
193
194impl GlobalBarrierManager {
195    pub async fn start(
196        scheduled_barriers: schedule::ScheduledBarriers,
197        env: MetaSrvEnv,
198        metadata_manager: MetadataManager,
199        hummock_manager: HummockManagerRef,
200        source_manager: SourceManagerRef,
201        sink_manager: SinkCoordinatorManager,
202        scale_controller: ScaleControllerRef,
203        barrier_scheduler: schedule::BarrierScheduler,
204        refresh_manager: GlobalRefreshManagerRef,
205    ) -> (Arc<Self>, JoinHandle<()>, oneshot::Sender<()>) {
206        let (request_tx, request_rx) = unbounded_channel();
207        let hummock_manager_clone = hummock_manager.clone();
208        let metadata_manager_clone = metadata_manager.clone();
209        let barrier_worker = GlobalBarrierWorker::new(
210            scheduled_barriers,
211            env,
212            metadata_manager,
213            hummock_manager,
214            source_manager,
215            sink_manager,
216            scale_controller,
217            request_rx,
218            barrier_scheduler,
219            refresh_manager,
220        )
221        .await;
222        let manager = Self {
223            status: barrier_worker.context.status(),
224            hummock_manager: hummock_manager_clone,
225            request_tx,
226            metadata_manager: metadata_manager_clone,
227        };
228        let (join_handle, shutdown_tx) = barrier_worker.start();
229        (Arc::new(manager), join_handle, shutdown_tx)
230    }
231}