risingwave_meta/barrier/
complete_task.rs1use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use risingwave_common::id::JobId;
23use risingwave_common::must_match;
24use risingwave_common::util::deployment::Deployment;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::id::{DatabaseId, PartialGraphId};
27use risingwave_pb::stream_service::barrier_complete_response::{
28 PbListFinishedSource, PbLoadFinishedSource,
29};
30use tokio::task::JoinHandle;
31
32use crate::barrier::checkpoint::CheckpointControl;
33use crate::barrier::context::GlobalBarrierWorkerContext;
34use crate::barrier::info::BarrierInfo;
35use crate::barrier::partial_graph::{PartialGraphBarrierInfo, PartialGraphManager};
36use crate::barrier::progress::TrackingJob;
37use crate::barrier::rpc::from_partial_graph_id;
38use crate::barrier::schedule::PeriodicBarriers;
39use crate::hummock::CommitEpochInfo;
40use crate::manager::MetaSrvEnv;
41use crate::rpc::metrics::GLOBAL_META_METRICS;
42use crate::{MetaError, MetaResult};
43
44pub(super) enum CompletingTask {
45 None,
46 Completing {
47 #[expect(clippy::type_complexity)]
48 epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
50
51 join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
55 },
56 #[expect(dead_code)]
57 Err(MetaError),
58}
59
60#[derive(Default)]
62pub(super) struct CompleteBarrierTask {
63 pub(super) commit_info: CommitEpochInfo,
64 pub(super) finished_jobs: Vec<TrackingJob>,
65 pub(super) finished_cdc_table_backfill: Vec<JobId>,
66 pub(super) epoch_infos: HashMap<PartialGraphId, PartialGraphBarrierInfo>,
68 pub(super) list_finished_source_ids: Vec<PbListFinishedSource>,
70 pub(super) load_finished_source_ids: Vec<PbLoadFinishedSource>,
72 pub(super) refresh_finished_table_job_ids: Vec<JobId>,
74}
75
76impl CompleteBarrierTask {
77 #[expect(clippy::type_complexity)]
78 pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)> {
79 let mut epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)> =
80 HashMap::new();
81 for (partial_graph_id, info) in &self.epoch_infos {
82 let (database_id, creating_job_id) = from_partial_graph_id(*partial_graph_id);
83 let epoch = info.barrier_info.prev_epoch();
84 let (database, jobs) = epochs_to_ack.entry(database_id).or_default();
85 if let Some(job_id) = creating_job_id {
86 jobs.push((job_id, epoch));
87 } else {
88 *database = Some(epoch);
89 }
90 }
91 epochs_to_ack
92 }
93}
94
95impl CompleteBarrierTask {
96 pub(super) async fn complete_barrier(
97 self,
98 context: &impl GlobalBarrierWorkerContext,
99 env: MetaSrvEnv,
100 ) -> MetaResult<HummockVersionStats> {
101 let mut notifiers = Vec::new();
102 let result: MetaResult<HummockVersionStats> = try {
103 let wait_commit_timer = GLOBAL_META_METRICS
104 .barrier_wait_commit_latency
105 .start_timer();
106 let version_stats = context.commit_epoch(self.commit_info).await?;
107
108 if !self.list_finished_source_ids.is_empty() {
115 context
116 .handle_list_finished_source_ids(self.list_finished_source_ids.clone())
117 .await?;
118 }
119
120 if !self.load_finished_source_ids.is_empty() {
123 context
124 .handle_load_finished_source_ids(self.load_finished_source_ids.clone())
125 .await?;
126 }
127
128 if !self.refresh_finished_table_job_ids.is_empty() {
130 context
131 .handle_refresh_finished_table_ids(self.refresh_finished_table_job_ids.clone())
132 .await?;
133 }
134
135 for (partial_graph_id, info) in self.epoch_infos {
136 let (database_id, job_id) = from_partial_graph_id(partial_graph_id);
137 let command_name = info.post_collect_command.command_name().to_owned();
138 let elapsed_secs = info.elapsed_secs();
139 notifiers.extend(info.notifiers);
140 context
141 .post_collect_command(info.post_collect_command)
142 .await?;
143 if job_id.is_none() {
144 Self::report_complete_event(
145 &env,
146 database_id,
147 elapsed_secs,
148 &info.barrier_info,
149 command_name,
150 );
151 GLOBAL_META_METRICS
152 .last_committed_barrier_time
153 .with_label_values(&[database_id.to_string().as_str()])
154 .set(info.barrier_info.curr_epoch.value().as_unix_secs() as i64);
155 }
156 }
157
158 wait_commit_timer.observe_duration();
159 version_stats
160 };
161
162 let version_stats = {
163 let version_stats = match result {
164 Ok(version_stats) => version_stats,
165 Err(e) => {
166 for notifier in notifiers {
167 notifier.notify_collection_failed(e.clone());
168 }
169 return Err(e);
170 }
171 };
172 notifiers.into_iter().for_each(|notifier| {
173 notifier.notify_collected();
174 });
175 try_join_all(
176 self.finished_jobs
177 .into_iter()
178 .map(|finished_job| context.finish_creating_job(finished_job)),
179 )
180 .await?;
181 try_join_all(
182 self.finished_cdc_table_backfill
183 .into_iter()
184 .map(|job_id| context.finish_cdc_table_backfill(job_id)),
185 )
186 .await?;
187 version_stats
188 };
189
190 Ok(version_stats)
191 }
192}
193
194impl CompleteBarrierTask {
195 fn report_complete_event(
196 env: &MetaSrvEnv,
197 database_id: DatabaseId,
198 duration_sec: f64,
199 barrier_info: &BarrierInfo,
200 command: String,
201 ) {
202 use risingwave_pb::meta::event_log;
204 let event = event_log::EventBarrierComplete {
205 prev_epoch: barrier_info.prev_epoch(),
206 cur_epoch: barrier_info.curr_epoch(),
207 duration_sec,
208 command,
209 barrier_kind: barrier_info.kind.as_str_name().to_owned(),
210 database_id: database_id.as_raw_id(),
211 };
212 if cfg!(debug_assertions) || Deployment::current().is_ci() {
213 if duration_sec > 5.0 {
215 tracing::warn!(event = ?event,"high barrier latency observed!")
216 }
217 }
218 env.event_log_manager_ref()
219 .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
220 }
221}
222
223pub(super) struct BarrierCompleteOutput {
224 #[expect(clippy::type_complexity)]
225 pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
227 pub hummock_version_stats: HummockVersionStats,
228}
229
230impl CompletingTask {
231 pub(super) fn next_completed_barrier<'a>(
232 &'a mut self,
233 periodic_barriers: &mut PeriodicBarriers,
234 checkpoint_control: &mut CheckpointControl,
235 partial_graph_manager: &mut PartialGraphManager,
236 context: &Arc<impl GlobalBarrierWorkerContext>,
237 env: &MetaSrvEnv,
238 ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
239 if let CompletingTask::None = self
242 && let Some(task) = checkpoint_control
243 .next_complete_barrier_task(periodic_barriers, partial_graph_manager)
244 {
245 {
246 let epochs_to_ack = task.epochs_to_ack();
247 let context = context.clone();
248 let await_tree_reg = env.await_tree_reg().clone();
249 let env = env.clone();
250
251 let fut = async move { task.complete_barrier(&*context, env).await };
252 let fut = await_tree_reg
253 .register_derived_root("Barrier Completion Task")
254 .instrument(fut);
255 let join_handle = tokio::spawn(fut);
256
257 *self = CompletingTask::Completing {
258 epochs_to_ack,
259 join_handle,
260 };
261 }
262 }
263
264 async move {
265 if !matches!(self, CompletingTask::Completing { .. }) {
266 return pending().await;
267 };
268 self.next_completed_barrier_inner().await
269 }
270 }
271
272 #[await_tree::instrument]
273 pub(super) async fn wait_completing_task(
274 &mut self,
275 ) -> MetaResult<Option<BarrierCompleteOutput>> {
276 match self {
277 CompletingTask::None => Ok(None),
278 CompletingTask::Completing { .. } => {
279 self.next_completed_barrier_inner().await.map(Some)
280 }
281 CompletingTask::Err(_) => {
282 unreachable!("should not be called on previous err")
283 }
284 }
285 }
286
287 async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
288 let CompletingTask::Completing { join_handle, .. } = self else {
289 unreachable!()
290 };
291
292 {
293 {
294 let join_result: MetaResult<_> = try {
295 join_handle
296 .await
297 .context("failed to join completing command")??
298 };
299 let next_completing_command_status = if let Err(e) = &join_result {
302 CompletingTask::Err(e.clone())
303 } else {
304 CompletingTask::None
305 };
306 let completed_command = replace(self, next_completing_command_status);
307 let hummock_version_stats = join_result?;
308
309 must_match!(completed_command, CompletingTask::Completing {
310 epochs_to_ack,
311 ..
312 } => {
313 Ok(BarrierCompleteOutput {
314 epochs_to_ack,
315 hummock_version_stats,
316 })
317 })
318 }
319 }
320 }
321}