risingwave_meta/barrier/
manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::sync::Arc;
17
18use anyhow::Context;
19use arc_swap::ArcSwap;
20use risingwave_common::bail;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_meta_model::{CreateType, DatabaseId};
23use risingwave_pb::ddl_service::DdlProgress;
24use risingwave_pb::meta::PbRecoveryStatus;
25use tokio::sync::mpsc::unbounded_channel;
26use tokio::sync::{mpsc, oneshot};
27use tokio::task::JoinHandle;
28use tracing::warn;
29
30use crate::MetaResult;
31use crate::barrier::worker::GlobalBarrierWorker;
32use crate::barrier::{BarrierManagerRequest, BarrierManagerStatus, RecoveryReason, schedule};
33use crate::hummock::HummockManagerRef;
34use crate::manager::sink_coordination::SinkCoordinatorManager;
35use crate::manager::{MetaSrvEnv, MetadataManager};
36use crate::stream::{ScaleControllerRef, SourceManagerRef};
37
38pub struct GlobalBarrierManager {
39    status: Arc<ArcSwap<BarrierManagerStatus>>,
40    hummock_manager: HummockManagerRef,
41    request_tx: mpsc::UnboundedSender<BarrierManagerRequest>,
42    metadata_manager: MetadataManager,
43}
44
45pub type BarrierManagerRef = Arc<GlobalBarrierManager>;
46
47impl GlobalBarrierManager {
48    /// Serving `SHOW JOBS / SELECT * FROM rw_ddl_progress`
49    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
50        let mut ddl_progress = {
51            let (tx, rx) = oneshot::channel();
52            self.request_tx
53                .send(BarrierManagerRequest::GetDdlProgress(tx))
54                .context("failed to send get ddl progress request")?;
55            rx.await.context("failed to receive get ddl progress")?
56        };
57        // If not in tracker, means the first barrier not collected yet.
58        // In that case just return progress 0.
59        let job_info = self
60            .metadata_manager
61            .catalog_controller
62            .list_background_creating_jobs(true)
63            .await?;
64        for (job_id, definition, _init_at) in job_info {
65            if let Entry::Vacant(e) = ddl_progress.entry(job_id as _) {
66                warn!(job_id, "background job has no ddl progress");
67                e.insert(DdlProgress {
68                    id: job_id as u64,
69                    statement: definition,
70                    create_type: CreateType::Background.as_str().into(),
71                    progress: "0.0%".into(),
72                });
73            }
74        }
75
76        Ok(ddl_progress.into_values().collect())
77    }
78
79    pub async fn adhoc_recovery(&self) -> MetaResult<()> {
80        let (tx, rx) = oneshot::channel();
81        self.request_tx
82            .send(BarrierManagerRequest::AdhocRecovery(tx))
83            .context("failed to send adhoc recovery request")?;
84        rx.await.context("failed to wait adhoc recovery")?;
85        Ok(())
86    }
87
88    pub async fn update_database_barrier(
89        &self,
90        database_id: DatabaseId,
91        barrier_interval_ms: Option<u32>,
92        checkpoint_frequency: Option<u64>,
93    ) -> MetaResult<()> {
94        let (tx, rx) = oneshot::channel();
95        self.request_tx
96            .send(BarrierManagerRequest::UpdateDatabaseBarrier {
97                database_id: (database_id as u32).into(),
98                barrier_interval_ms,
99                checkpoint_frequency,
100                sender: tx,
101            })
102            .context("failed to send update database barrier request")?;
103        rx.await.context("failed to wait update database barrier")?;
104        Ok(())
105    }
106
107    pub async fn get_hummock_version_id(&self) -> HummockVersionId {
108        self.hummock_manager.get_version_id().await
109    }
110}
111
112impl GlobalBarrierManager {
113    /// Check the status of barrier manager, return error if it is not `Running`.
114    pub fn check_status_running(&self) -> MetaResult<()> {
115        let status = self.status.load();
116        match &**status {
117            BarrierManagerStatus::Starting
118            | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
119                bail!("The cluster is bootstrapping")
120            }
121            BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
122                Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
123            }
124            BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
125                bail!("The cluster is recovering-adhoc")
126            }
127            BarrierManagerStatus::Running => Ok(()),
128        }
129    }
130
131    pub fn get_recovery_status(&self) -> PbRecoveryStatus {
132        (&**self.status.load()).into()
133    }
134}
135
136impl GlobalBarrierManager {
137    pub async fn start(
138        scheduled_barriers: schedule::ScheduledBarriers,
139        env: MetaSrvEnv,
140        metadata_manager: MetadataManager,
141        hummock_manager: HummockManagerRef,
142        source_manager: SourceManagerRef,
143        sink_manager: SinkCoordinatorManager,
144        scale_controller: ScaleControllerRef,
145    ) -> (Arc<Self>, JoinHandle<()>, oneshot::Sender<()>) {
146        let (request_tx, request_rx) = unbounded_channel();
147        let hummock_manager_clone = hummock_manager.clone();
148        let metadata_manager_clone = metadata_manager.clone();
149        let barrier_worker = GlobalBarrierWorker::new(
150            scheduled_barriers,
151            env,
152            metadata_manager,
153            hummock_manager,
154            source_manager,
155            sink_manager,
156            scale_controller,
157            request_rx,
158        )
159        .await;
160        let manager = Self {
161            status: barrier_worker.context.status(),
162            hummock_manager: hummock_manager_clone,
163            request_tx,
164            metadata_manager: metadata_manager_clone,
165        };
166        let (join_handle, shutdown_tx) = barrier_worker.start();
167        (Arc::new(manager), join_handle, shutdown_tx)
168    }
169}