risingwave_meta/barrier/
complete_task.rs1use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use prometheus::HistogramTimer;
23use risingwave_common::catalog::DatabaseId;
24use risingwave_common::id::JobId;
25use risingwave_common::must_match;
26use risingwave_common::util::deployment::Deployment;
27use risingwave_pb::hummock::HummockVersionStats;
28use risingwave_pb::stream_service::barrier_complete_response::{
29 PbListFinishedSource, PbLoadFinishedSource,
30};
31use tokio::task::JoinHandle;
32
33use crate::barrier::checkpoint::CheckpointControl;
34use crate::barrier::command::CommandContext;
35use crate::barrier::context::GlobalBarrierWorkerContext;
36use crate::barrier::notifier::Notifier;
37use crate::barrier::progress::TrackingJob;
38use crate::barrier::rpc::ControlStreamManager;
39use crate::barrier::schedule::PeriodicBarriers;
40use crate::hummock::CommitEpochInfo;
41use crate::manager::MetaSrvEnv;
42use crate::rpc::metrics::GLOBAL_META_METRICS;
43use crate::{MetaError, MetaResult};
44
45pub(super) enum CompletingTask {
46 None,
47 Completing {
48 #[expect(clippy::type_complexity)]
49 epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
51
52 join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
56 },
57 #[expect(dead_code)]
58 Err(MetaError),
59}
60
61#[derive(Default)]
63pub(super) struct CompleteBarrierTask {
64 pub(super) commit_info: CommitEpochInfo,
65 pub(super) finished_jobs: Vec<TrackingJob>,
66 pub(super) finished_cdc_table_backfill: Vec<JobId>,
67 pub(super) notifiers: Vec<Notifier>,
68 #[expect(clippy::type_complexity)]
70 pub(super) epoch_infos:
71 HashMap<DatabaseId, (Option<(CommandContext, HistogramTimer)>, Vec<(JobId, u64)>)>,
72 pub(super) list_finished_source_ids: Vec<PbListFinishedSource>,
74 pub(super) load_finished_source_ids: Vec<PbLoadFinishedSource>,
76 pub(super) refresh_finished_table_job_ids: Vec<JobId>,
78}
79
80impl CompleteBarrierTask {
81 #[expect(clippy::type_complexity)]
82 pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)> {
83 self.epoch_infos
84 .iter()
85 .map(|(database_id, (command_context, creating_job_epochs))| {
86 (
87 *database_id,
88 (
89 command_context
90 .as_ref()
91 .map(|(command, _)| command.barrier_info.prev_epoch.value().0),
92 creating_job_epochs.clone(),
93 ),
94 )
95 })
96 .collect()
97 }
98}
99
100impl CompleteBarrierTask {
101 pub(super) async fn complete_barrier(
102 self,
103 context: &impl GlobalBarrierWorkerContext,
104 env: MetaSrvEnv,
105 ) -> MetaResult<HummockVersionStats> {
106 let result: MetaResult<HummockVersionStats> = try {
107 let wait_commit_timer = GLOBAL_META_METRICS
108 .barrier_wait_commit_latency
109 .start_timer();
110 let version_stats = context.commit_epoch(self.commit_info).await?;
111
112 if !self.list_finished_source_ids.is_empty() {
119 context
120 .handle_list_finished_source_ids(self.list_finished_source_ids.clone())
121 .await?;
122 }
123
124 if !self.load_finished_source_ids.is_empty() {
127 context
128 .handle_load_finished_source_ids(self.load_finished_source_ids.clone())
129 .await?;
130 }
131
132 if !self.refresh_finished_table_job_ids.is_empty() {
134 context
135 .handle_refresh_finished_table_ids(self.refresh_finished_table_job_ids.clone())
136 .await?;
137 }
138
139 for command_ctx in self
140 .epoch_infos
141 .values()
142 .flat_map(|(command, _)| command.as_ref().map(|(command, _)| command))
143 {
144 context.post_collect_command(command_ctx).await?;
145 }
146
147 wait_commit_timer.observe_duration();
148 version_stats
149 };
150
151 let version_stats = {
152 let version_stats = match result {
153 Ok(version_stats) => version_stats,
154 Err(e) => {
155 for notifier in self.notifiers {
156 notifier.notify_collection_failed(e.clone());
157 }
158 return Err(e);
159 }
160 };
161 self.notifiers.into_iter().for_each(|notifier| {
162 notifier.notify_collected();
163 });
164 try_join_all(
165 self.finished_jobs
166 .into_iter()
167 .map(|finished_job| context.finish_creating_job(finished_job)),
168 )
169 .await?;
170 try_join_all(
171 self.finished_cdc_table_backfill
172 .into_iter()
173 .map(|job_id| context.finish_cdc_table_backfill(job_id)),
174 )
175 .await?;
176 for (database_id, (command, _)) in self.epoch_infos {
177 if let Some((command_ctx, enqueue_time)) = command {
178 let duration_sec = enqueue_time.stop_and_record();
179 Self::report_complete_event(&env, duration_sec, &command_ctx);
180 GLOBAL_META_METRICS
181 .last_committed_barrier_time
182 .with_label_values(&[database_id.to_string().as_str()])
183 .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64);
184 }
185 }
186 version_stats
187 };
188
189 Ok(version_stats)
190 }
191}
192
193impl CompleteBarrierTask {
194 fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) {
195 use risingwave_pb::meta::event_log;
197 let event = event_log::EventBarrierComplete {
198 prev_epoch: command_ctx.barrier_info.prev_epoch(),
199 cur_epoch: command_ctx.barrier_info.curr_epoch.value().0,
200 duration_sec,
201 command: command_ctx
202 .command
203 .as_ref()
204 .map(|command| command.to_string())
205 .unwrap_or_else(|| "barrier".to_owned()),
206 barrier_kind: command_ctx.barrier_info.kind.as_str_name().to_owned(),
207 };
208 if cfg!(debug_assertions) || Deployment::current().is_ci() {
209 if duration_sec > 5.0 {
211 tracing::warn!(event = ?event,"high barrier latency observed!")
212 }
213 }
214 env.event_log_manager_ref()
215 .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
216 }
217}
218
219pub(super) struct BarrierCompleteOutput {
220 #[expect(clippy::type_complexity)]
221 pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
223 pub hummock_version_stats: HummockVersionStats,
224}
225
226impl CompletingTask {
227 pub(super) fn next_completed_barrier<'a>(
228 &'a mut self,
229 periodic_barriers: &mut PeriodicBarriers,
230 checkpoint_control: &mut CheckpointControl,
231 control_stream_manager: &mut ControlStreamManager,
232 context: &Arc<impl GlobalBarrierWorkerContext>,
233 env: &MetaSrvEnv,
234 ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
235 if let CompletingTask::None = self
238 && let Some(task) = checkpoint_control
239 .next_complete_barrier_task(Some((periodic_barriers, control_stream_manager)))
240 {
241 {
242 let epochs_to_ack = task.epochs_to_ack();
243 let context = context.clone();
244 let await_tree_reg = env.await_tree_reg().clone();
245 let env = env.clone();
246
247 let fut = async move { task.complete_barrier(&*context, env).await };
248 let fut = await_tree_reg
249 .register_derived_root("Barrier Completion Task")
250 .instrument(fut);
251 let join_handle = tokio::spawn(fut);
252
253 *self = CompletingTask::Completing {
254 epochs_to_ack,
255 join_handle,
256 };
257 }
258 }
259
260 async move {
261 if !matches!(self, CompletingTask::Completing { .. }) {
262 return pending().await;
263 };
264 self.next_completed_barrier_inner().await
265 }
266 }
267
268 #[await_tree::instrument]
269 pub(super) async fn wait_completing_task(
270 &mut self,
271 ) -> MetaResult<Option<BarrierCompleteOutput>> {
272 match self {
273 CompletingTask::None => Ok(None),
274 CompletingTask::Completing { .. } => {
275 self.next_completed_barrier_inner().await.map(Some)
276 }
277 CompletingTask::Err(_) => {
278 unreachable!("should not be called on previous err")
279 }
280 }
281 }
282
283 async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
284 let CompletingTask::Completing { join_handle, .. } = self else {
285 unreachable!()
286 };
287
288 {
289 {
290 let join_result: MetaResult<_> = try {
291 join_handle
292 .await
293 .context("failed to join completing command")??
294 };
295 let next_completing_command_status = if let Err(e) = &join_result {
298 CompletingTask::Err(e.clone())
299 } else {
300 CompletingTask::None
301 };
302 let completed_command = replace(self, next_completing_command_status);
303 let hummock_version_stats = join_result?;
304
305 must_match!(completed_command, CompletingTask::Completing {
306 epochs_to_ack,
307 ..
308 } => {
309 Ok(BarrierCompleteOutput {
310 epochs_to_ack,
311 hummock_version_stats,
312 })
313 })
314 }
315 }
316 }
317}