risingwave_meta/barrier/
complete_task.rs1use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use prometheus::HistogramTimer;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::must_match;
25use risingwave_common::util::deployment::Deployment;
26use risingwave_pb::hummock::HummockVersionStats;
27use tokio::task::JoinHandle;
28
29use crate::barrier::checkpoint::CheckpointControl;
30use crate::barrier::command::CommandContext;
31use crate::barrier::context::GlobalBarrierWorkerContext;
32use crate::barrier::notifier::Notifier;
33use crate::barrier::progress::TrackingJob;
34use crate::barrier::rpc::ControlStreamManager;
35use crate::barrier::schedule::PeriodicBarriers;
36use crate::hummock::CommitEpochInfo;
37use crate::manager::MetaSrvEnv;
38use crate::rpc::metrics::GLOBAL_META_METRICS;
39use crate::{MetaError, MetaResult};
40
41pub(super) enum CompletingTask {
42 None,
43 Completing {
44 #[expect(clippy::type_complexity)]
45 epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
47
48 join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
52 },
53 #[expect(dead_code)]
54 Err(MetaError),
55}
56
57#[derive(Default)]
59pub(super) struct CompleteBarrierTask {
60 pub(super) commit_info: CommitEpochInfo,
61 pub(super) finished_jobs: Vec<TrackingJob>,
62 pub(super) finished_cdc_table_backfill: Vec<TableId>,
63 pub(super) notifiers: Vec<Notifier>,
64 #[expect(clippy::type_complexity)]
66 pub(super) epoch_infos: HashMap<
67 DatabaseId,
68 (
69 Option<(CommandContext, HistogramTimer)>,
70 Vec<(TableId, u64)>,
71 ),
72 >,
73 pub(super) load_finished_source_ids: Vec<u32>,
75 pub(super) refresh_finished_table_ids: Vec<u32>,
77}
78
79impl CompleteBarrierTask {
80 #[expect(clippy::type_complexity)]
81 pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)> {
82 self.epoch_infos
83 .iter()
84 .map(|(database_id, (command_context, creating_job_epochs))| {
85 (
86 *database_id,
87 (
88 command_context
89 .as_ref()
90 .map(|(command, _)| command.barrier_info.prev_epoch.value().0),
91 creating_job_epochs.clone(),
92 ),
93 )
94 })
95 .collect()
96 }
97}
98
99impl CompleteBarrierTask {
100 pub(super) async fn complete_barrier(
101 self,
102 context: &impl GlobalBarrierWorkerContext,
103 env: MetaSrvEnv,
104 ) -> MetaResult<HummockVersionStats> {
105 let result: MetaResult<HummockVersionStats> = try {
106 let wait_commit_timer = GLOBAL_META_METRICS
107 .barrier_wait_commit_latency
108 .start_timer();
109 let version_stats = context.commit_epoch(self.commit_info).await?;
110
111 if !self.load_finished_source_ids.is_empty() {
114 context
115 .handle_load_finished_source_ids(self.load_finished_source_ids.clone())
116 .await?;
117 }
118
119 if !self.refresh_finished_table_ids.is_empty() {
121 context
122 .handle_refresh_finished_table_ids(self.refresh_finished_table_ids.clone())
123 .await?;
124 }
125
126 for command_ctx in self
127 .epoch_infos
128 .values()
129 .flat_map(|(command, _)| command.as_ref().map(|(command, _)| command))
130 {
131 context.post_collect_command(command_ctx).await?;
132 }
133
134 wait_commit_timer.observe_duration();
135 version_stats
136 };
137
138 let version_stats = {
139 let version_stats = match result {
140 Ok(version_stats) => version_stats,
141 Err(e) => {
142 for notifier in self.notifiers {
143 notifier.notify_collection_failed(e.clone());
144 }
145 return Err(e);
146 }
147 };
148 self.notifiers.into_iter().for_each(|notifier| {
149 notifier.notify_collected();
150 });
151 try_join_all(
152 self.finished_jobs
153 .into_iter()
154 .map(|finished_job| context.finish_creating_job(finished_job)),
155 )
156 .await?;
157 try_join_all(
158 self.finished_cdc_table_backfill
159 .into_iter()
160 .map(|job_id| context.finish_cdc_table_backfill(job_id)),
161 )
162 .await?;
163 for (database_id, (command, _)) in self.epoch_infos {
164 if let Some((command_ctx, enqueue_time)) = command {
165 let duration_sec = enqueue_time.stop_and_record();
166 Self::report_complete_event(&env, duration_sec, &command_ctx);
167 GLOBAL_META_METRICS
168 .last_committed_barrier_time
169 .with_label_values(&[database_id.database_id.to_string().as_str()])
170 .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64);
171 }
172 }
173 version_stats
174 };
175
176 Ok(version_stats)
177 }
178}
179
180impl CompleteBarrierTask {
181 fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) {
182 use risingwave_pb::meta::event_log;
184 let event = event_log::EventBarrierComplete {
185 prev_epoch: command_ctx.barrier_info.prev_epoch(),
186 cur_epoch: command_ctx.barrier_info.curr_epoch.value().0,
187 duration_sec,
188 command: command_ctx
189 .command
190 .as_ref()
191 .map(|command| command.to_string())
192 .unwrap_or_else(|| "barrier".to_owned()),
193 barrier_kind: command_ctx.barrier_info.kind.as_str_name().to_owned(),
194 };
195 if cfg!(debug_assertions) || Deployment::current().is_ci() {
196 if duration_sec > 5.0 {
198 tracing::warn!(event = ?event,"high barrier latency observed!")
199 }
200 }
201 env.event_log_manager_ref()
202 .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
203 }
204}
205
206pub(super) struct BarrierCompleteOutput {
207 #[expect(clippy::type_complexity)]
208 pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
210 pub hummock_version_stats: HummockVersionStats,
211}
212
213impl CompletingTask {
214 pub(super) fn next_completed_barrier<'a>(
215 &'a mut self,
216 periodic_barriers: &mut PeriodicBarriers,
217 checkpoint_control: &mut CheckpointControl,
218 control_stream_manager: &mut ControlStreamManager,
219 context: &Arc<impl GlobalBarrierWorkerContext>,
220 env: &MetaSrvEnv,
221 ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
222 if let CompletingTask::None = self
225 && let Some(task) = checkpoint_control
226 .next_complete_barrier_task(Some((periodic_barriers, control_stream_manager)))
227 {
228 {
229 let epochs_to_ack = task.epochs_to_ack();
230 let context = context.clone();
231 let await_tree_reg = env.await_tree_reg().clone();
232 let env = env.clone();
233
234 let fut = async move { task.complete_barrier(&*context, env).await };
235 let fut = await_tree_reg
236 .register_derived_root("Barrier Completion Task")
237 .instrument(fut);
238 let join_handle = tokio::spawn(fut);
239
240 *self = CompletingTask::Completing {
241 epochs_to_ack,
242 join_handle,
243 };
244 }
245 }
246
247 async move {
248 if !matches!(self, CompletingTask::Completing { .. }) {
249 return pending().await;
250 };
251 self.next_completed_barrier_inner().await
252 }
253 }
254
255 #[await_tree::instrument]
256 pub(super) async fn wait_completing_task(
257 &mut self,
258 ) -> MetaResult<Option<BarrierCompleteOutput>> {
259 match self {
260 CompletingTask::None => Ok(None),
261 CompletingTask::Completing { .. } => {
262 self.next_completed_barrier_inner().await.map(Some)
263 }
264 CompletingTask::Err(_) => {
265 unreachable!("should not be called on previous err")
266 }
267 }
268 }
269
270 async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
271 let CompletingTask::Completing { join_handle, .. } = self else {
272 unreachable!()
273 };
274
275 {
276 {
277 let join_result: MetaResult<_> = try {
278 join_handle
279 .await
280 .context("failed to join completing command")??
281 };
282 let next_completing_command_status = if let Err(e) = &join_result {
285 CompletingTask::Err(e.clone())
286 } else {
287 CompletingTask::None
288 };
289 let completed_command = replace(self, next_completing_command_status);
290 let hummock_version_stats = join_result?;
291
292 must_match!(completed_command, CompletingTask::Completing {
293 epochs_to_ack,
294 ..
295 } => {
296 Ok(BarrierCompleteOutput {
297 epochs_to_ack,
298 hummock_version_stats,
299 })
300 })
301 }
302 }
303 }
304}