risingwave_meta/barrier/
complete_task.rs1use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use prometheus::HistogramTimer;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::must_match;
25use risingwave_common::util::deployment::Deployment;
26use risingwave_pb::hummock::HummockVersionStats;
27use tokio::task::JoinHandle;
28
29use crate::barrier::checkpoint::CheckpointControl;
30use crate::barrier::command::CommandContext;
31use crate::barrier::context::GlobalBarrierWorkerContext;
32use crate::barrier::notifier::Notifier;
33use crate::barrier::progress::TrackingJob;
34use crate::barrier::rpc::ControlStreamManager;
35use crate::barrier::schedule::PeriodicBarriers;
36use crate::hummock::CommitEpochInfo;
37use crate::manager::MetaSrvEnv;
38use crate::rpc::metrics::GLOBAL_META_METRICS;
39use crate::{MetaError, MetaResult};
40
41pub(super) enum CompletingTask {
42 None,
43 Completing {
44 #[expect(clippy::type_complexity)]
45 epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
47
48 join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
52 },
53 #[expect(dead_code)]
54 Err(MetaError),
55}
56
57#[derive(Default)]
58pub(super) struct CompleteBarrierTask {
59 pub(super) commit_info: CommitEpochInfo,
60 pub(super) finished_jobs: Vec<TrackingJob>,
61 pub(super) notifiers: Vec<Notifier>,
62 #[expect(clippy::type_complexity)]
64 pub(super) epoch_infos: HashMap<
65 DatabaseId,
66 (
67 Option<(CommandContext, HistogramTimer)>,
68 Vec<(TableId, u64)>,
69 ),
70 >,
71}
72
73impl CompleteBarrierTask {
74 #[expect(clippy::type_complexity)]
75 pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)> {
76 self.epoch_infos
77 .iter()
78 .map(|(database_id, (command_context, creating_job_epochs))| {
79 (
80 *database_id,
81 (
82 command_context
83 .as_ref()
84 .map(|(command, _)| command.barrier_info.prev_epoch.value().0),
85 creating_job_epochs.clone(),
86 ),
87 )
88 })
89 .collect()
90 }
91}
92
93impl CompleteBarrierTask {
94 pub(super) async fn complete_barrier(
95 self,
96 context: &impl GlobalBarrierWorkerContext,
97 env: MetaSrvEnv,
98 ) -> MetaResult<HummockVersionStats> {
99 let result: MetaResult<HummockVersionStats> = try {
100 let wait_commit_timer = GLOBAL_META_METRICS
101 .barrier_wait_commit_latency
102 .start_timer();
103 let version_stats = context.commit_epoch(self.commit_info).await?;
104 for command_ctx in self
105 .epoch_infos
106 .values()
107 .flat_map(|(command, _)| command.as_ref().map(|(command, _)| command))
108 {
109 context.post_collect_command(command_ctx).await?;
110 }
111
112 wait_commit_timer.observe_duration();
113 version_stats
114 };
115
116 let version_stats = {
117 let version_stats = match result {
118 Ok(version_stats) => version_stats,
119 Err(e) => {
120 for notifier in self.notifiers {
121 notifier.notify_collection_failed(e.clone());
122 }
123 return Err(e);
124 }
125 };
126 self.notifiers.into_iter().for_each(|notifier| {
127 notifier.notify_collected();
128 });
129 try_join_all(
130 self.finished_jobs
131 .into_iter()
132 .map(|finished_job| context.finish_creating_job(finished_job)),
133 )
134 .await?;
135 for (database_id, (command, _)) in self.epoch_infos {
136 if let Some((command_ctx, enqueue_time)) = command {
137 let duration_sec = enqueue_time.stop_and_record();
138 Self::report_complete_event(&env, duration_sec, &command_ctx);
139 GLOBAL_META_METRICS
140 .last_committed_barrier_time
141 .with_label_values(&[database_id.database_id.to_string().as_str()])
142 .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64);
143 }
144 }
145 version_stats
146 };
147
148 Ok(version_stats)
149 }
150}
151
152impl CompleteBarrierTask {
153 fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) {
154 use risingwave_pb::meta::event_log;
156 let event = event_log::EventBarrierComplete {
157 prev_epoch: command_ctx.barrier_info.prev_epoch(),
158 cur_epoch: command_ctx.barrier_info.curr_epoch.value().0,
159 duration_sec,
160 command: command_ctx
161 .command
162 .as_ref()
163 .map(|command| command.to_string())
164 .unwrap_or_else(|| "barrier".to_owned()),
165 barrier_kind: command_ctx.barrier_info.kind.as_str_name().to_owned(),
166 };
167 if cfg!(debug_assertions) || Deployment::current().is_ci() {
168 if duration_sec > 5.0 {
170 tracing::warn!(event = ?event,"high barrier latency observed!")
171 }
172 }
173 env.event_log_manager_ref()
174 .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
175 }
176}
177
178pub(super) struct BarrierCompleteOutput {
179 #[expect(clippy::type_complexity)]
180 pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
182 pub hummock_version_stats: HummockVersionStats,
183}
184
185impl CompletingTask {
186 pub(super) fn next_completed_barrier<'a>(
187 &'a mut self,
188 periodic_barriers: &mut PeriodicBarriers,
189 checkpoint_control: &mut CheckpointControl,
190 control_stream_manager: &mut ControlStreamManager,
191 context: &Arc<impl GlobalBarrierWorkerContext>,
192 env: &MetaSrvEnv,
193 ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
194 if let CompletingTask::None = self {
197 if let Some(task) = checkpoint_control
198 .next_complete_barrier_task(Some((periodic_barriers, control_stream_manager)))
199 {
200 {
201 let epochs_to_ack = task.epochs_to_ack();
202 let context = context.clone();
203 let await_tree_reg = env.await_tree_reg().clone();
204 let env = env.clone();
205
206 let fut = async move { task.complete_barrier(&*context, env).await };
207 let fut = await_tree_reg
208 .register_derived_root("Barrier Completion Task")
209 .instrument(fut);
210 let join_handle = tokio::spawn(fut);
211
212 *self = CompletingTask::Completing {
213 epochs_to_ack,
214 join_handle,
215 };
216 }
217 }
218 }
219
220 async move {
221 if !matches!(self, CompletingTask::Completing { .. }) {
222 return pending().await;
223 };
224 self.next_completed_barrier_inner().await
225 }
226 }
227
228 #[await_tree::instrument]
229 pub(super) async fn wait_completing_task(
230 &mut self,
231 ) -> MetaResult<Option<BarrierCompleteOutput>> {
232 match self {
233 CompletingTask::None => Ok(None),
234 CompletingTask::Completing { .. } => {
235 self.next_completed_barrier_inner().await.map(Some)
236 }
237 CompletingTask::Err(_) => {
238 unreachable!("should not be called on previous err")
239 }
240 }
241 }
242
243 async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
244 let CompletingTask::Completing { join_handle, .. } = self else {
245 unreachable!()
246 };
247
248 {
249 {
250 let join_result: MetaResult<_> = try {
251 join_handle
252 .await
253 .context("failed to join completing command")??
254 };
255 let next_completing_command_status = if let Err(e) = &join_result {
258 CompletingTask::Err(e.clone())
259 } else {
260 CompletingTask::None
261 };
262 let completed_command = replace(self, next_completing_command_status);
263 let hummock_version_stats = join_result?;
264
265 must_match!(completed_command, CompletingTask::Completing {
266 epochs_to_ack,
267 ..
268 } => {
269 Ok(BarrierCompleteOutput {
270 epochs_to_ack,
271 hummock_version_stats,
272 })
273 })
274 }
275 }
276 }
277}