risingwave_meta/barrier/
complete_task.rs1use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use prometheus::HistogramTimer;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::must_match;
25use risingwave_common::util::deployment::Deployment;
26use risingwave_pb::hummock::HummockVersionStats;
27use tokio::task::JoinHandle;
28
29use crate::barrier::checkpoint::CheckpointControl;
30use crate::barrier::command::CommandContext;
31use crate::barrier::context::GlobalBarrierWorkerContext;
32use crate::barrier::notifier::Notifier;
33use crate::barrier::progress::TrackingJob;
34use crate::barrier::rpc::ControlStreamManager;
35use crate::barrier::schedule::PeriodicBarriers;
36use crate::hummock::CommitEpochInfo;
37use crate::manager::MetaSrvEnv;
38use crate::rpc::metrics::GLOBAL_META_METRICS;
39use crate::{MetaError, MetaResult};
40
41pub(super) enum CompletingTask {
42 None,
43 Completing {
44 #[expect(clippy::type_complexity)]
45 epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
47
48 join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
52 },
53 #[expect(dead_code)]
54 Err(MetaError),
55}
56
57#[derive(Default)]
59pub(super) struct CompleteBarrierTask {
60 pub(super) commit_info: CommitEpochInfo,
61 pub(super) finished_jobs: Vec<TrackingJob>,
62 pub(super) notifiers: Vec<Notifier>,
63 #[expect(clippy::type_complexity)]
65 pub(super) epoch_infos: HashMap<
66 DatabaseId,
67 (
68 Option<(CommandContext, HistogramTimer)>,
69 Vec<(TableId, u64)>,
70 ),
71 >,
72 pub(super) load_finished_source_ids: Vec<u32>,
74}
75
76impl CompleteBarrierTask {
77 #[expect(clippy::type_complexity)]
78 pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)> {
79 self.epoch_infos
80 .iter()
81 .map(|(database_id, (command_context, creating_job_epochs))| {
82 (
83 *database_id,
84 (
85 command_context
86 .as_ref()
87 .map(|(command, _)| command.barrier_info.prev_epoch.value().0),
88 creating_job_epochs.clone(),
89 ),
90 )
91 })
92 .collect()
93 }
94}
95
96impl CompleteBarrierTask {
97 pub(super) async fn complete_barrier(
98 self,
99 context: &impl GlobalBarrierWorkerContext,
100 env: MetaSrvEnv,
101 ) -> MetaResult<HummockVersionStats> {
102 let result: MetaResult<HummockVersionStats> = try {
103 let wait_commit_timer = GLOBAL_META_METRICS
104 .barrier_wait_commit_latency
105 .start_timer();
106 let version_stats = context.commit_epoch(self.commit_info).await?;
107
108 if !self.load_finished_source_ids.is_empty() {
111 context
112 .handle_load_finished_source_ids(self.load_finished_source_ids.clone())
113 .await?;
114 }
115
116 for command_ctx in self
117 .epoch_infos
118 .values()
119 .flat_map(|(command, _)| command.as_ref().map(|(command, _)| command))
120 {
121 context.post_collect_command(command_ctx).await?;
122 }
123
124 wait_commit_timer.observe_duration();
125 version_stats
126 };
127
128 let version_stats = {
129 let version_stats = match result {
130 Ok(version_stats) => version_stats,
131 Err(e) => {
132 for notifier in self.notifiers {
133 notifier.notify_collection_failed(e.clone());
134 }
135 return Err(e);
136 }
137 };
138 self.notifiers.into_iter().for_each(|notifier| {
139 notifier.notify_collected();
140 });
141 try_join_all(
142 self.finished_jobs
143 .into_iter()
144 .map(|finished_job| context.finish_creating_job(finished_job)),
145 )
146 .await?;
147 for (database_id, (command, _)) in self.epoch_infos {
148 if let Some((command_ctx, enqueue_time)) = command {
149 let duration_sec = enqueue_time.stop_and_record();
150 Self::report_complete_event(&env, duration_sec, &command_ctx);
151 GLOBAL_META_METRICS
152 .last_committed_barrier_time
153 .with_label_values(&[database_id.database_id.to_string().as_str()])
154 .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64);
155 }
156 }
157 version_stats
158 };
159
160 Ok(version_stats)
161 }
162}
163
164impl CompleteBarrierTask {
165 fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) {
166 use risingwave_pb::meta::event_log;
168 let event = event_log::EventBarrierComplete {
169 prev_epoch: command_ctx.barrier_info.prev_epoch(),
170 cur_epoch: command_ctx.barrier_info.curr_epoch.value().0,
171 duration_sec,
172 command: command_ctx
173 .command
174 .as_ref()
175 .map(|command| command.to_string())
176 .unwrap_or_else(|| "barrier".to_owned()),
177 barrier_kind: command_ctx.barrier_info.kind.as_str_name().to_owned(),
178 };
179 if cfg!(debug_assertions) || Deployment::current().is_ci() {
180 if duration_sec > 5.0 {
182 tracing::warn!(event = ?event,"high barrier latency observed!")
183 }
184 }
185 env.event_log_manager_ref()
186 .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
187 }
188}
189
190pub(super) struct BarrierCompleteOutput {
191 #[expect(clippy::type_complexity)]
192 pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
194 pub hummock_version_stats: HummockVersionStats,
195}
196
197impl CompletingTask {
198 pub(super) fn next_completed_barrier<'a>(
199 &'a mut self,
200 periodic_barriers: &mut PeriodicBarriers,
201 checkpoint_control: &mut CheckpointControl,
202 control_stream_manager: &mut ControlStreamManager,
203 context: &Arc<impl GlobalBarrierWorkerContext>,
204 env: &MetaSrvEnv,
205 ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
206 if let CompletingTask::None = self
209 && let Some(task) = checkpoint_control
210 .next_complete_barrier_task(Some((periodic_barriers, control_stream_manager)))
211 {
212 {
213 let epochs_to_ack = task.epochs_to_ack();
214 let context = context.clone();
215 let await_tree_reg = env.await_tree_reg().clone();
216 let env = env.clone();
217
218 let fut = async move { task.complete_barrier(&*context, env).await };
219 let fut = await_tree_reg
220 .register_derived_root("Barrier Completion Task")
221 .instrument(fut);
222 let join_handle = tokio::spawn(fut);
223
224 *self = CompletingTask::Completing {
225 epochs_to_ack,
226 join_handle,
227 };
228 }
229 }
230
231 async move {
232 if !matches!(self, CompletingTask::Completing { .. }) {
233 return pending().await;
234 };
235 self.next_completed_barrier_inner().await
236 }
237 }
238
239 #[await_tree::instrument]
240 pub(super) async fn wait_completing_task(
241 &mut self,
242 ) -> MetaResult<Option<BarrierCompleteOutput>> {
243 match self {
244 CompletingTask::None => Ok(None),
245 CompletingTask::Completing { .. } => {
246 self.next_completed_barrier_inner().await.map(Some)
247 }
248 CompletingTask::Err(_) => {
249 unreachable!("should not be called on previous err")
250 }
251 }
252 }
253
254 async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
255 let CompletingTask::Completing { join_handle, .. } = self else {
256 unreachable!()
257 };
258
259 {
260 {
261 let join_result: MetaResult<_> = try {
262 join_handle
263 .await
264 .context("failed to join completing command")??
265 };
266 let next_completing_command_status = if let Err(e) = &join_result {
269 CompletingTask::Err(e.clone())
270 } else {
271 CompletingTask::None
272 };
273 let completed_command = replace(self, next_completing_command_status);
274 let hummock_version_stats = join_result?;
275
276 must_match!(completed_command, CompletingTask::Completing {
277 epochs_to_ack,
278 ..
279 } => {
280 Ok(BarrierCompleteOutput {
281 epochs_to_ack,
282 hummock_version_stats,
283 })
284 })
285 }
286 }
287 }
288}