risingwave_meta/barrier/
manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use anyhow::Context;
20use arc_swap::ArcSwap;
21use risingwave_common::bail;
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_meta_model::{CreateType, DatabaseId};
24use risingwave_pb::ddl_service::DdlProgress;
25use risingwave_pb::id::JobId;
26use risingwave_pb::meta::PbRecoveryStatus;
27use tokio::sync::mpsc::unbounded_channel;
28use tokio::sync::{mpsc, oneshot};
29use tokio::task::JoinHandle;
30use tracing::warn;
31
32use crate::MetaResult;
33use crate::barrier::cdc_progress::CdcProgress;
34use crate::barrier::worker::GlobalBarrierWorker;
35use crate::barrier::{BarrierManagerRequest, BarrierManagerStatus, RecoveryReason, schedule};
36use crate::hummock::HummockManagerRef;
37use crate::manager::sink_coordination::SinkCoordinatorManager;
38use crate::manager::{MetaSrvEnv, MetadataManager};
39use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
40
41pub struct GlobalBarrierManager {
42    status: Arc<ArcSwap<BarrierManagerStatus>>,
43    hummock_manager: HummockManagerRef,
44    request_tx: mpsc::UnboundedSender<BarrierManagerRequest>,
45    metadata_manager: MetadataManager,
46}
47
48pub type BarrierManagerRef = Arc<GlobalBarrierManager>;
49
50impl GlobalBarrierManager {
51    /// Serving `SHOW JOBS / SELECT * FROM rw_ddl_progress`
52    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
53        let mut ddl_progress = {
54            let (tx, rx) = oneshot::channel();
55            self.request_tx
56                .send(BarrierManagerRequest::GetDdlProgress(tx))
57                .context("failed to send get ddl progress request")?;
58            rx.await.context("failed to receive get ddl progress")?
59        };
60        // If not in tracker, means the first barrier not collected yet.
61        // In that case just return progress 0.
62        let job_info = self
63            .metadata_manager
64            .catalog_controller
65            .list_background_creating_jobs(true, None)
66            .await?;
67        for (job_id, definition, _init_at) in job_info {
68            if let Entry::Vacant(e) = ddl_progress.entry(job_id) {
69                warn!(%job_id, "background job has no ddl progress");
70                e.insert(DdlProgress {
71                    id: job_id.as_raw_id() as u64,
72                    statement: definition,
73                    create_type: CreateType::Background.as_str().into(),
74                    progress: "0.0%".into(),
75                });
76            }
77        }
78
79        Ok(ddl_progress.into_values().collect())
80    }
81
82    pub async fn get_cdc_progress(&self) -> MetaResult<HashMap<JobId, CdcProgress>> {
83        let (tx, rx) = oneshot::channel();
84        self.request_tx
85            .send(BarrierManagerRequest::GetCdcProgress(tx))
86            .context("failed to send get ddl progress request")?;
87        Ok(rx.await.context("failed to receive get ddl progress")?)
88    }
89
90    pub async fn adhoc_recovery(&self) -> MetaResult<()> {
91        let (tx, rx) = oneshot::channel();
92        self.request_tx
93            .send(BarrierManagerRequest::AdhocRecovery(tx))
94            .context("failed to send adhoc recovery request")?;
95        rx.await.context("failed to wait adhoc recovery")?;
96        Ok(())
97    }
98
99    pub async fn update_database_barrier(
100        &self,
101        database_id: DatabaseId,
102        barrier_interval_ms: Option<u32>,
103        checkpoint_frequency: Option<u64>,
104    ) -> MetaResult<()> {
105        let (tx, rx) = oneshot::channel();
106        self.request_tx
107            .send(BarrierManagerRequest::UpdateDatabaseBarrier {
108                database_id,
109                barrier_interval_ms,
110                checkpoint_frequency,
111                sender: tx,
112            })
113            .context("failed to send update database barrier request")?;
114        rx.await.context("failed to wait update database barrier")?;
115        Ok(())
116    }
117
118    pub async fn get_hummock_version_id(&self) -> HummockVersionId {
119        self.hummock_manager.get_version_id().await
120    }
121}
122
123impl GlobalBarrierManager {
124    /// Check the status of barrier manager, return error if it is not `Running`.
125    pub fn check_status_running(&self) -> MetaResult<()> {
126        let status = self.status.load();
127        match &**status {
128            BarrierManagerStatus::Starting
129            | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
130                bail!("The cluster is bootstrapping")
131            }
132            BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
133                Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
134            }
135            BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
136                bail!("The cluster is recovering-adhoc")
137            }
138            BarrierManagerStatus::Running => Ok(()),
139        }
140    }
141
142    pub fn get_recovery_status(&self) -> PbRecoveryStatus {
143        (&**self.status.load()).into()
144    }
145}
146
147impl GlobalBarrierManager {
148    pub async fn start(
149        scheduled_barriers: schedule::ScheduledBarriers,
150        env: MetaSrvEnv,
151        metadata_manager: MetadataManager,
152        hummock_manager: HummockManagerRef,
153        source_manager: SourceManagerRef,
154        sink_manager: SinkCoordinatorManager,
155        scale_controller: ScaleControllerRef,
156        barrier_scheduler: schedule::BarrierScheduler,
157        refresh_manager: GlobalRefreshManagerRef,
158    ) -> (Arc<Self>, JoinHandle<()>, oneshot::Sender<()>) {
159        let (request_tx, request_rx) = unbounded_channel();
160        let hummock_manager_clone = hummock_manager.clone();
161        let metadata_manager_clone = metadata_manager.clone();
162        let barrier_worker = GlobalBarrierWorker::new(
163            scheduled_barriers,
164            env,
165            metadata_manager,
166            hummock_manager,
167            source_manager,
168            sink_manager,
169            scale_controller,
170            request_rx,
171            barrier_scheduler,
172            refresh_manager,
173        )
174        .await;
175        let manager = Self {
176            status: barrier_worker.context.status(),
177            hummock_manager: hummock_manager_clone,
178            request_tx,
179            metadata_manager: metadata_manager_clone,
180        };
181        let (join_handle, shutdown_tx) = barrier_worker.start();
182        (Arc::new(manager), join_handle, shutdown_tx)
183    }
184}