risingwave_meta/barrier/
complete_task.rs1use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use prometheus::HistogramTimer;
23use risingwave_common::catalog::DatabaseId;
24use risingwave_common::id::JobId;
25use risingwave_common::must_match;
26use risingwave_common::util::deployment::Deployment;
27use risingwave_pb::hummock::HummockVersionStats;
28use tokio::task::JoinHandle;
29
30use crate::barrier::checkpoint::CheckpointControl;
31use crate::barrier::command::CommandContext;
32use crate::barrier::context::GlobalBarrierWorkerContext;
33use crate::barrier::notifier::Notifier;
34use crate::barrier::progress::TrackingJob;
35use crate::barrier::rpc::ControlStreamManager;
36use crate::barrier::schedule::PeriodicBarriers;
37use crate::hummock::CommitEpochInfo;
38use crate::manager::MetaSrvEnv;
39use crate::rpc::metrics::GLOBAL_META_METRICS;
40use crate::{MetaError, MetaResult};
41
42pub(super) enum CompletingTask {
43 None,
44 Completing {
45 #[expect(clippy::type_complexity)]
46 epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
48
49 join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
53 },
54 #[expect(dead_code)]
55 Err(MetaError),
56}
57
58#[derive(Default)]
60pub(super) struct CompleteBarrierTask {
61 pub(super) commit_info: CommitEpochInfo,
62 pub(super) finished_jobs: Vec<TrackingJob>,
63 pub(super) finished_cdc_table_backfill: Vec<JobId>,
64 pub(super) notifiers: Vec<Notifier>,
65 #[expect(clippy::type_complexity)]
67 pub(super) epoch_infos:
68 HashMap<DatabaseId, (Option<(CommandContext, HistogramTimer)>, Vec<(JobId, u64)>)>,
69 pub(super) list_finished_source_ids: Vec<u32>,
71 pub(super) load_finished_source_ids: Vec<u32>,
73 pub(super) refresh_finished_table_job_ids: Vec<JobId>,
75}
76
77impl CompleteBarrierTask {
78 #[expect(clippy::type_complexity)]
79 pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)> {
80 self.epoch_infos
81 .iter()
82 .map(|(database_id, (command_context, creating_job_epochs))| {
83 (
84 *database_id,
85 (
86 command_context
87 .as_ref()
88 .map(|(command, _)| command.barrier_info.prev_epoch.value().0),
89 creating_job_epochs.clone(),
90 ),
91 )
92 })
93 .collect()
94 }
95}
96
97impl CompleteBarrierTask {
98 pub(super) async fn complete_barrier(
99 self,
100 context: &impl GlobalBarrierWorkerContext,
101 env: MetaSrvEnv,
102 ) -> MetaResult<HummockVersionStats> {
103 let result: MetaResult<HummockVersionStats> = try {
104 let wait_commit_timer = GLOBAL_META_METRICS
105 .barrier_wait_commit_latency
106 .start_timer();
107 let version_stats = context.commit_epoch(self.commit_info).await?;
108
109 if !self.list_finished_source_ids.is_empty() {
116 context
117 .handle_list_finished_source_ids(self.list_finished_source_ids.clone())
118 .await?;
119 }
120
121 if !self.load_finished_source_ids.is_empty() {
124 context
125 .handle_load_finished_source_ids(self.load_finished_source_ids.clone())
126 .await?;
127 }
128
129 if !self.refresh_finished_table_job_ids.is_empty() {
131 context
132 .handle_refresh_finished_table_ids(self.refresh_finished_table_job_ids.clone())
133 .await?;
134 }
135
136 for command_ctx in self
137 .epoch_infos
138 .values()
139 .flat_map(|(command, _)| command.as_ref().map(|(command, _)| command))
140 {
141 context.post_collect_command(command_ctx).await?;
142 }
143
144 wait_commit_timer.observe_duration();
145 version_stats
146 };
147
148 let version_stats = {
149 let version_stats = match result {
150 Ok(version_stats) => version_stats,
151 Err(e) => {
152 for notifier in self.notifiers {
153 notifier.notify_collection_failed(e.clone());
154 }
155 return Err(e);
156 }
157 };
158 self.notifiers.into_iter().for_each(|notifier| {
159 notifier.notify_collected();
160 });
161 try_join_all(
162 self.finished_jobs
163 .into_iter()
164 .map(|finished_job| context.finish_creating_job(finished_job)),
165 )
166 .await?;
167 try_join_all(
168 self.finished_cdc_table_backfill
169 .into_iter()
170 .map(|job_id| context.finish_cdc_table_backfill(job_id)),
171 )
172 .await?;
173 for (database_id, (command, _)) in self.epoch_infos {
174 if let Some((command_ctx, enqueue_time)) = command {
175 let duration_sec = enqueue_time.stop_and_record();
176 Self::report_complete_event(&env, duration_sec, &command_ctx);
177 GLOBAL_META_METRICS
178 .last_committed_barrier_time
179 .with_label_values(&[database_id.to_string().as_str()])
180 .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64);
181 }
182 }
183 version_stats
184 };
185
186 Ok(version_stats)
187 }
188}
189
190impl CompleteBarrierTask {
191 fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) {
192 use risingwave_pb::meta::event_log;
194 let event = event_log::EventBarrierComplete {
195 prev_epoch: command_ctx.barrier_info.prev_epoch(),
196 cur_epoch: command_ctx.barrier_info.curr_epoch.value().0,
197 duration_sec,
198 command: command_ctx
199 .command
200 .as_ref()
201 .map(|command| command.to_string())
202 .unwrap_or_else(|| "barrier".to_owned()),
203 barrier_kind: command_ctx.barrier_info.kind.as_str_name().to_owned(),
204 };
205 if cfg!(debug_assertions) || Deployment::current().is_ci() {
206 if duration_sec > 5.0 {
208 tracing::warn!(event = ?event,"high barrier latency observed!")
209 }
210 }
211 env.event_log_manager_ref()
212 .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
213 }
214}
215
216pub(super) struct BarrierCompleteOutput {
217 #[expect(clippy::type_complexity)]
218 pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
220 pub hummock_version_stats: HummockVersionStats,
221}
222
223impl CompletingTask {
224 pub(super) fn next_completed_barrier<'a>(
225 &'a mut self,
226 periodic_barriers: &mut PeriodicBarriers,
227 checkpoint_control: &mut CheckpointControl,
228 control_stream_manager: &mut ControlStreamManager,
229 context: &Arc<impl GlobalBarrierWorkerContext>,
230 env: &MetaSrvEnv,
231 ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
232 if let CompletingTask::None = self
235 && let Some(task) = checkpoint_control
236 .next_complete_barrier_task(Some((periodic_barriers, control_stream_manager)))
237 {
238 {
239 let epochs_to_ack = task.epochs_to_ack();
240 let context = context.clone();
241 let await_tree_reg = env.await_tree_reg().clone();
242 let env = env.clone();
243
244 let fut = async move { task.complete_barrier(&*context, env).await };
245 let fut = await_tree_reg
246 .register_derived_root("Barrier Completion Task")
247 .instrument(fut);
248 let join_handle = tokio::spawn(fut);
249
250 *self = CompletingTask::Completing {
251 epochs_to_ack,
252 join_handle,
253 };
254 }
255 }
256
257 async move {
258 if !matches!(self, CompletingTask::Completing { .. }) {
259 return pending().await;
260 };
261 self.next_completed_barrier_inner().await
262 }
263 }
264
265 #[await_tree::instrument]
266 pub(super) async fn wait_completing_task(
267 &mut self,
268 ) -> MetaResult<Option<BarrierCompleteOutput>> {
269 match self {
270 CompletingTask::None => Ok(None),
271 CompletingTask::Completing { .. } => {
272 self.next_completed_barrier_inner().await.map(Some)
273 }
274 CompletingTask::Err(_) => {
275 unreachable!("should not be called on previous err")
276 }
277 }
278 }
279
280 async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
281 let CompletingTask::Completing { join_handle, .. } = self else {
282 unreachable!()
283 };
284
285 {
286 {
287 let join_result: MetaResult<_> = try {
288 join_handle
289 .await
290 .context("failed to join completing command")??
291 };
292 let next_completing_command_status = if let Err(e) = &join_result {
295 CompletingTask::Err(e.clone())
296 } else {
297 CompletingTask::None
298 };
299 let completed_command = replace(self, next_completing_command_status);
300 let hummock_version_stats = join_result?;
301
302 must_match!(completed_command, CompletingTask::Completing {
303 epochs_to_ack,
304 ..
305 } => {
306 Ok(BarrierCompleteOutput {
307 epochs_to_ack,
308 hummock_version_stats,
309 })
310 })
311 }
312 }
313 }
314}