risingwave_meta/barrier/
manager.rs1use std::collections::hash_map::Entry;
16use std::sync::Arc;
17
18use anyhow::Context;
19use arc_swap::ArcSwap;
20use risingwave_common::bail;
21use risingwave_hummock_sdk::HummockVersionId;
22use risingwave_meta_model::{CreateType, DatabaseId};
23use risingwave_pb::ddl_service::DdlProgress;
24use risingwave_pb::meta::PbRecoveryStatus;
25use tokio::sync::mpsc::unbounded_channel;
26use tokio::sync::{mpsc, oneshot};
27use tokio::task::JoinHandle;
28use tracing::warn;
29
30use crate::MetaResult;
31use crate::barrier::worker::GlobalBarrierWorker;
32use crate::barrier::{BarrierManagerRequest, BarrierManagerStatus, RecoveryReason, schedule};
33use crate::hummock::HummockManagerRef;
34use crate::manager::sink_coordination::SinkCoordinatorManager;
35use crate::manager::{MetaSrvEnv, MetadataManager};
36use crate::stream::{ScaleControllerRef, SourceManagerRef};
37
38pub struct GlobalBarrierManager {
39 status: Arc<ArcSwap<BarrierManagerStatus>>,
40 hummock_manager: HummockManagerRef,
41 request_tx: mpsc::UnboundedSender<BarrierManagerRequest>,
42 metadata_manager: MetadataManager,
43}
44
45pub type BarrierManagerRef = Arc<GlobalBarrierManager>;
46
47impl GlobalBarrierManager {
48 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
50 let mut ddl_progress = {
51 let (tx, rx) = oneshot::channel();
52 self.request_tx
53 .send(BarrierManagerRequest::GetDdlProgress(tx))
54 .context("failed to send get ddl progress request")?;
55 rx.await.context("failed to receive get ddl progress")?
56 };
57 let job_info = self
60 .metadata_manager
61 .catalog_controller
62 .list_background_creating_jobs(true)
63 .await?;
64 for (job_id, definition, _init_at) in job_info {
65 if let Entry::Vacant(e) = ddl_progress.entry(job_id as _) {
66 warn!(job_id, "background job has no ddl progress");
67 e.insert(DdlProgress {
68 id: job_id as u64,
69 statement: definition,
70 create_type: CreateType::Background.as_str().into(),
71 progress: "0.0%".into(),
72 });
73 }
74 }
75
76 Ok(ddl_progress.into_values().collect())
77 }
78
79 pub async fn adhoc_recovery(&self) -> MetaResult<()> {
80 let (tx, rx) = oneshot::channel();
81 self.request_tx
82 .send(BarrierManagerRequest::AdhocRecovery(tx))
83 .context("failed to send adhoc recovery request")?;
84 rx.await.context("failed to wait adhoc recovery")?;
85 Ok(())
86 }
87
88 pub async fn update_database_barrier(
89 &self,
90 database_id: DatabaseId,
91 barrier_interval_ms: Option<u32>,
92 checkpoint_frequency: Option<u64>,
93 ) -> MetaResult<()> {
94 let (tx, rx) = oneshot::channel();
95 self.request_tx
96 .send(BarrierManagerRequest::UpdateDatabaseBarrier {
97 database_id: (database_id as u32).into(),
98 barrier_interval_ms,
99 checkpoint_frequency,
100 sender: tx,
101 })
102 .context("failed to send update database barrier request")?;
103 rx.await.context("failed to wait update database barrier")?;
104 Ok(())
105 }
106
107 pub async fn get_hummock_version_id(&self) -> HummockVersionId {
108 self.hummock_manager.get_version_id().await
109 }
110}
111
112impl GlobalBarrierManager {
113 pub fn check_status_running(&self) -> MetaResult<()> {
115 let status = self.status.load();
116 match &**status {
117 BarrierManagerStatus::Starting
118 | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
119 bail!("The cluster is bootstrapping")
120 }
121 BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
122 Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
123 }
124 BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
125 bail!("The cluster is recovering-adhoc")
126 }
127 BarrierManagerStatus::Running => Ok(()),
128 }
129 }
130
131 pub fn get_recovery_status(&self) -> PbRecoveryStatus {
132 (&**self.status.load()).into()
133 }
134}
135
136impl GlobalBarrierManager {
137 pub async fn start(
138 scheduled_barriers: schedule::ScheduledBarriers,
139 env: MetaSrvEnv,
140 metadata_manager: MetadataManager,
141 hummock_manager: HummockManagerRef,
142 source_manager: SourceManagerRef,
143 sink_manager: SinkCoordinatorManager,
144 scale_controller: ScaleControllerRef,
145 ) -> (Arc<Self>, JoinHandle<()>, oneshot::Sender<()>) {
146 let (request_tx, request_rx) = unbounded_channel();
147 let hummock_manager_clone = hummock_manager.clone();
148 let metadata_manager_clone = metadata_manager.clone();
149 let barrier_worker = GlobalBarrierWorker::new(
150 scheduled_barriers,
151 env,
152 metadata_manager,
153 hummock_manager,
154 source_manager,
155 sink_manager,
156 scale_controller,
157 request_rx,
158 )
159 .await;
160 let manager = Self {
161 status: barrier_worker.context.status(),
162 hummock_manager: hummock_manager_clone,
163 request_tx,
164 metadata_manager: metadata_manager_clone,
165 };
166 let (join_handle, shutdown_tx) = barrier_worker.start();
167 (Arc::new(manager), join_handle, shutdown_tx)
168 }
169}