risingwave_meta/barrier/
manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::sync::Arc;
17
18use anyhow::Context;
19use arc_swap::ArcSwap;
20use risingwave_common::bail;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::mpsc::unbounded_channel;
25use tokio::sync::{mpsc, oneshot};
26use tokio::task::JoinHandle;
27use tracing::warn;
28
29use crate::MetaResult;
30use crate::barrier::worker::GlobalBarrierWorker;
31use crate::barrier::{BarrierManagerRequest, BarrierManagerStatus, RecoveryReason, schedule};
32use crate::hummock::HummockManagerRef;
33use crate::manager::sink_coordination::SinkCoordinatorManager;
34use crate::manager::{MetaSrvEnv, MetadataManager};
35use crate::stream::{ScaleControllerRef, SourceManagerRef};
36
37pub struct GlobalBarrierManager {
38    status: Arc<ArcSwap<BarrierManagerStatus>>,
39    hummock_manager: HummockManagerRef,
40    request_tx: mpsc::UnboundedSender<BarrierManagerRequest>,
41    metadata_manager: MetadataManager,
42}
43
44pub type BarrierManagerRef = Arc<GlobalBarrierManager>;
45
46impl GlobalBarrierManager {
47    /// Serving `SHOW JOBS / SELECT * FROM rw_ddl_progress`
48    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
49        let mut ddl_progress = {
50            let (tx, rx) = oneshot::channel();
51            self.request_tx
52                .send(BarrierManagerRequest::GetDdlProgress(tx))
53                .context("failed to send get ddl progress request")?;
54            rx.await.context("failed to receive get ddl progress")?
55        };
56        // If not in tracker, means the first barrier not collected yet.
57        // In that case just return progress 0.
58        let mviews = self
59            .metadata_manager
60            .catalog_controller
61            .list_background_creating_mviews(true)
62            .await?;
63        for mview in mviews {
64            if let Entry::Vacant(e) = ddl_progress.entry(mview.table_id as _) {
65                warn!(
66                    job_id = mview.table_id,
67                    "background job has no ddl progress"
68                );
69                e.insert(DdlProgress {
70                    id: mview.table_id as u64,
71                    statement: mview.definition,
72                    progress: "0.0%".into(),
73                });
74            }
75        }
76
77        Ok(ddl_progress.into_values().collect())
78    }
79
80    pub async fn adhoc_recovery(&self) -> MetaResult<()> {
81        let (tx, rx) = oneshot::channel();
82        self.request_tx
83            .send(BarrierManagerRequest::AdhocRecovery(tx))
84            .context("failed to send adhoc recovery request")?;
85        rx.await.context("failed to wait adhoc recovery")?;
86        Ok(())
87    }
88
89    pub async fn get_hummock_version_id(&self) -> HummockVersionId {
90        self.hummock_manager.get_version_id().await
91    }
92}
93
94impl GlobalBarrierManager {
95    /// Check the status of barrier manager, return error if it is not `Running`.
96    pub fn check_status_running(&self) -> MetaResult<()> {
97        let status = self.status.load();
98        match &**status {
99            BarrierManagerStatus::Starting
100            | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
101                bail!("The cluster is bootstrapping")
102            }
103            BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
104                Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
105            }
106            BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
107                bail!("The cluster is recovering-adhoc")
108            }
109            BarrierManagerStatus::Running => Ok(()),
110        }
111    }
112
113    pub fn get_recovery_status(&self) -> PbRecoveryStatus {
114        (&**self.status.load()).into()
115    }
116}
117
118impl GlobalBarrierManager {
119    pub async fn start(
120        scheduled_barriers: schedule::ScheduledBarriers,
121        env: MetaSrvEnv,
122        metadata_manager: MetadataManager,
123        hummock_manager: HummockManagerRef,
124        source_manager: SourceManagerRef,
125        sink_manager: SinkCoordinatorManager,
126        scale_controller: ScaleControllerRef,
127    ) -> (Arc<Self>, JoinHandle<()>, oneshot::Sender<()>) {
128        let (request_tx, request_rx) = unbounded_channel();
129        let hummock_manager_clone = hummock_manager.clone();
130        let metadata_manager_clone = metadata_manager.clone();
131        let barrier_worker = GlobalBarrierWorker::new(
132            scheduled_barriers,
133            env,
134            metadata_manager,
135            hummock_manager,
136            source_manager,
137            sink_manager,
138            scale_controller,
139            request_rx,
140        )
141        .await;
142        let manager = Self {
143            status: barrier_worker.context.status(),
144            hummock_manager: hummock_manager_clone,
145            request_tx,
146            metadata_manager: metadata_manager_clone,
147        };
148        let (join_handle, shutdown_tx) = barrier_worker.start();
149        (Arc::new(manager), join_handle, shutdown_tx)
150    }
151}