1use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use prometheus::HistogramTimer;
23use risingwave_common::catalog::DatabaseId;
24use risingwave_common::id::JobId;
25use risingwave_common::must_match;
26use risingwave_common::util::deployment::Deployment;
27use risingwave_pb::hummock::HummockVersionStats;
28use risingwave_pb::stream_service::barrier_complete_response::{
29 PbListFinishedSource, PbLoadFinishedSource,
30};
31use tokio::task::JoinHandle;
32
33use crate::barrier::checkpoint::CheckpointControl;
34use crate::barrier::command::CommandContext;
35use crate::barrier::context::{CreateSnapshotBackfillJobCommandInfo, GlobalBarrierWorkerContext};
36use crate::barrier::info::BarrierInfo;
37use crate::barrier::notifier::Notifier;
38use crate::barrier::progress::TrackingJob;
39use crate::barrier::rpc::ControlStreamManager;
40use crate::barrier::schedule::PeriodicBarriers;
41use crate::hummock::CommitEpochInfo;
42use crate::manager::MetaSrvEnv;
43use crate::rpc::metrics::GLOBAL_META_METRICS;
44use crate::{MetaError, MetaResult};
45
46pub(super) enum CompletingTask {
47 None,
48 Completing {
49 #[expect(clippy::type_complexity)]
50 epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
52
53 join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
57 },
58 #[expect(dead_code)]
59 Err(MetaError),
60}
61
62#[derive(Default)]
64pub(super) struct CompleteBarrierTask {
65 pub(super) commit_info: CommitEpochInfo,
66 pub(super) finished_jobs: Vec<TrackingJob>,
67 pub(super) finished_cdc_table_backfill: Vec<JobId>,
68 pub(super) notifiers: Vec<Notifier>,
69 #[expect(clippy::type_complexity)]
71 pub(super) epoch_infos: HashMap<
72 DatabaseId,
73 (
74 Option<(CommandContext, HistogramTimer)>,
75 Vec<(JobId, u64, Option<CreateSnapshotBackfillJobCommandInfo>)>,
76 ),
77 >,
78 pub(super) list_finished_source_ids: Vec<PbListFinishedSource>,
80 pub(super) load_finished_source_ids: Vec<PbLoadFinishedSource>,
82 pub(super) refresh_finished_table_job_ids: Vec<JobId>,
84}
85
86impl CompleteBarrierTask {
87 #[expect(clippy::type_complexity)]
88 pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)> {
89 self.epoch_infos
90 .iter()
91 .map(|(database_id, (command_context, creating_job_epochs))| {
92 (
93 *database_id,
94 (
95 command_context
96 .as_ref()
97 .map(|(command, _)| command.barrier_info.prev_epoch()),
98 creating_job_epochs
99 .iter()
100 .map(|(job_id, epoch, _)| (*job_id, *epoch))
101 .collect(),
102 ),
103 )
104 })
105 .collect()
106 }
107}
108
109impl CompleteBarrierTask {
110 pub(super) async fn complete_barrier(
111 self,
112 context: &impl GlobalBarrierWorkerContext,
113 env: MetaSrvEnv,
114 ) -> MetaResult<HummockVersionStats> {
115 let result: MetaResult<HummockVersionStats> = try {
116 let wait_commit_timer = GLOBAL_META_METRICS
117 .barrier_wait_commit_latency
118 .start_timer();
119 let version_stats = context.commit_epoch(self.commit_info).await?;
120
121 if !self.list_finished_source_ids.is_empty() {
128 context
129 .handle_list_finished_source_ids(self.list_finished_source_ids.clone())
130 .await?;
131 }
132
133 if !self.load_finished_source_ids.is_empty() {
136 context
137 .handle_load_finished_source_ids(self.load_finished_source_ids.clone())
138 .await?;
139 }
140
141 if !self.refresh_finished_table_job_ids.is_empty() {
143 context
144 .handle_refresh_finished_table_ids(self.refresh_finished_table_job_ids.clone())
145 .await?;
146 }
147
148 for (database_id, (command_ctx, creating_jobs)) in self.epoch_infos {
149 if let Some((command_ctx, enqueue_time)) = command_ctx {
150 let command_name = command_ctx.command.command_name().to_owned();
151 context.post_collect_command(command_ctx.command).await?;
152 let duration_sec = enqueue_time.stop_and_record();
153 Self::report_complete_event(
154 &env,
155 duration_sec,
156 &command_ctx.barrier_info,
157 command_name,
158 );
159 GLOBAL_META_METRICS
160 .last_committed_barrier_time
161 .with_label_values(&[database_id.to_string().as_str()])
162 .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64);
163 }
164 for (_, _, create_info) in creating_jobs {
165 if let Some(create_info) = create_info {
166 context
167 .post_collect_command(create_info.into_post_collect())
168 .await?;
169 }
170 }
171 }
172
173 wait_commit_timer.observe_duration();
174 version_stats
175 };
176
177 let version_stats = {
178 let version_stats = match result {
179 Ok(version_stats) => version_stats,
180 Err(e) => {
181 for notifier in self.notifiers {
182 notifier.notify_collection_failed(e.clone());
183 }
184 return Err(e);
185 }
186 };
187 self.notifiers.into_iter().for_each(|notifier| {
188 notifier.notify_collected();
189 });
190 try_join_all(
191 self.finished_jobs
192 .into_iter()
193 .map(|finished_job| context.finish_creating_job(finished_job)),
194 )
195 .await?;
196 try_join_all(
197 self.finished_cdc_table_backfill
198 .into_iter()
199 .map(|job_id| context.finish_cdc_table_backfill(job_id)),
200 )
201 .await?;
202 version_stats
203 };
204
205 Ok(version_stats)
206 }
207}
208
209impl CompleteBarrierTask {
210 fn report_complete_event(
211 env: &MetaSrvEnv,
212 duration_sec: f64,
213 barrier_info: &BarrierInfo,
214 command: String,
215 ) {
216 use risingwave_pb::meta::event_log;
218 let event = event_log::EventBarrierComplete {
219 prev_epoch: barrier_info.prev_epoch(),
220 cur_epoch: barrier_info.curr_epoch(),
221 duration_sec,
222 command,
223 barrier_kind: barrier_info.kind.as_str_name().to_owned(),
224 };
225 if cfg!(debug_assertions) || Deployment::current().is_ci() {
226 if duration_sec > 5.0 {
228 tracing::warn!(event = ?event,"high barrier latency observed!")
229 }
230 }
231 env.event_log_manager_ref()
232 .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
233 }
234}
235
236pub(super) struct BarrierCompleteOutput {
237 #[expect(clippy::type_complexity)]
238 pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
240 pub hummock_version_stats: HummockVersionStats,
241}
242
243impl CompletingTask {
244 pub(super) fn next_completed_barrier<'a>(
245 &'a mut self,
246 periodic_barriers: &mut PeriodicBarriers,
247 checkpoint_control: &mut CheckpointControl,
248 control_stream_manager: &mut ControlStreamManager,
249 context: &Arc<impl GlobalBarrierWorkerContext>,
250 env: &MetaSrvEnv,
251 ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
252 if let CompletingTask::None = self
255 && let Some(task) = checkpoint_control
256 .next_complete_barrier_task(Some((periodic_barriers, control_stream_manager)))
257 {
258 {
259 let epochs_to_ack = task.epochs_to_ack();
260 let context = context.clone();
261 let await_tree_reg = env.await_tree_reg().clone();
262 let env = env.clone();
263
264 let fut = async move { task.complete_barrier(&*context, env).await };
265 let fut = await_tree_reg
266 .register_derived_root("Barrier Completion Task")
267 .instrument(fut);
268 let join_handle = tokio::spawn(fut);
269
270 *self = CompletingTask::Completing {
271 epochs_to_ack,
272 join_handle,
273 };
274 }
275 }
276
277 async move {
278 if !matches!(self, CompletingTask::Completing { .. }) {
279 return pending().await;
280 };
281 self.next_completed_barrier_inner().await
282 }
283 }
284
285 #[await_tree::instrument]
286 pub(super) async fn wait_completing_task(
287 &mut self,
288 ) -> MetaResult<Option<BarrierCompleteOutput>> {
289 match self {
290 CompletingTask::None => Ok(None),
291 CompletingTask::Completing { .. } => {
292 self.next_completed_barrier_inner().await.map(Some)
293 }
294 CompletingTask::Err(_) => {
295 unreachable!("should not be called on previous err")
296 }
297 }
298 }
299
300 async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
301 let CompletingTask::Completing { join_handle, .. } = self else {
302 unreachable!()
303 };
304
305 {
306 {
307 let join_result: MetaResult<_> = try {
308 join_handle
309 .await
310 .context("failed to join completing command")??
311 };
312 let next_completing_command_status = if let Err(e) = &join_result {
315 CompletingTask::Err(e.clone())
316 } else {
317 CompletingTask::None
318 };
319 let completed_command = replace(self, next_completing_command_status);
320 let hummock_version_stats = join_result?;
321
322 must_match!(completed_command, CompletingTask::Completing {
323 epochs_to_ack,
324 ..
325 } => {
326 Ok(BarrierCompleteOutput {
327 epochs_to_ack,
328 hummock_version_stats,
329 })
330 })
331 }
332 }
333 }
334}