risingwave_meta/barrier/
complete_task.rs1use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use prometheus::HistogramTimer;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::must_match;
25use risingwave_pb::hummock::HummockVersionStats;
26use tokio::task::JoinHandle;
27
28use crate::barrier::checkpoint::CheckpointControl;
29use crate::barrier::command::CommandContext;
30use crate::barrier::context::GlobalBarrierWorkerContext;
31use crate::barrier::notifier::Notifier;
32use crate::barrier::progress::TrackingJob;
33use crate::barrier::rpc::ControlStreamManager;
34use crate::barrier::schedule::PeriodicBarriers;
35use crate::hummock::CommitEpochInfo;
36use crate::manager::MetaSrvEnv;
37use crate::rpc::metrics::GLOBAL_META_METRICS;
38use crate::{MetaError, MetaResult};
39
40pub(super) enum CompletingTask {
41 None,
42 Completing {
43 #[expect(clippy::type_complexity)]
44 epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
46
47 join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
51 },
52 #[expect(dead_code)]
53 Err(MetaError),
54}
55
56#[derive(Default)]
57pub(super) struct CompleteBarrierTask {
58 pub(super) commit_info: CommitEpochInfo,
59 pub(super) finished_jobs: Vec<TrackingJob>,
60 pub(super) notifiers: Vec<Notifier>,
61 #[expect(clippy::type_complexity)]
63 pub(super) epoch_infos: HashMap<
64 DatabaseId,
65 (
66 Option<(CommandContext, HistogramTimer)>,
67 Vec<(TableId, u64)>,
68 ),
69 >,
70}
71
72impl CompleteBarrierTask {
73 #[expect(clippy::type_complexity)]
74 pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)> {
75 self.epoch_infos
76 .iter()
77 .map(|(database_id, (command_context, creating_job_epochs))| {
78 (
79 *database_id,
80 (
81 command_context
82 .as_ref()
83 .map(|(command, _)| command.barrier_info.prev_epoch.value().0),
84 creating_job_epochs.clone(),
85 ),
86 )
87 })
88 .collect()
89 }
90}
91
92impl CompleteBarrierTask {
93 pub(super) async fn complete_barrier(
94 self,
95 context: &impl GlobalBarrierWorkerContext,
96 env: MetaSrvEnv,
97 ) -> MetaResult<HummockVersionStats> {
98 let result: MetaResult<HummockVersionStats> = try {
99 let wait_commit_timer = GLOBAL_META_METRICS
100 .barrier_wait_commit_latency
101 .start_timer();
102 let version_stats = context.commit_epoch(self.commit_info).await?;
103 for command_ctx in self
104 .epoch_infos
105 .values()
106 .flat_map(|(command, _)| command.as_ref().map(|(command, _)| command))
107 {
108 context.post_collect_command(command_ctx).await?;
109 }
110
111 wait_commit_timer.observe_duration();
112 version_stats
113 };
114
115 let version_stats = {
116 let version_stats = match result {
117 Ok(version_stats) => version_stats,
118 Err(e) => {
119 for notifier in self.notifiers {
120 notifier.notify_collection_failed(e.clone());
121 }
122 return Err(e);
123 }
124 };
125 self.notifiers.into_iter().for_each(|notifier| {
126 notifier.notify_collected();
127 });
128 try_join_all(
129 self.finished_jobs
130 .into_iter()
131 .map(|finished_job| context.finish_creating_job(finished_job)),
132 )
133 .await?;
134 for (database_id, (command, _)) in self.epoch_infos {
135 if let Some((command_ctx, enqueue_time)) = command {
136 let duration_sec = enqueue_time.stop_and_record();
137 Self::report_complete_event(&env, duration_sec, &command_ctx);
138 GLOBAL_META_METRICS
139 .last_committed_barrier_time
140 .with_label_values(&[database_id.database_id.to_string().as_str()])
141 .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64);
142 }
143 }
144 version_stats
145 };
146
147 Ok(version_stats)
148 }
149}
150
151impl CompleteBarrierTask {
152 fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) {
153 use risingwave_pb::meta::event_log;
155 let event = event_log::EventBarrierComplete {
156 prev_epoch: command_ctx.barrier_info.prev_epoch(),
157 cur_epoch: command_ctx.barrier_info.curr_epoch.value().0,
158 duration_sec,
159 command: command_ctx
160 .command
161 .as_ref()
162 .map(|command| command.to_string())
163 .unwrap_or_else(|| "barrier".to_owned()),
164 barrier_kind: command_ctx.barrier_info.kind.as_str_name().to_owned(),
165 };
166 env.event_log_manager_ref()
167 .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
168 }
169}
170
171pub(super) struct BarrierCompleteOutput {
172 #[expect(clippy::type_complexity)]
173 pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
175 pub hummock_version_stats: HummockVersionStats,
176}
177
178impl CompletingTask {
179 pub(super) fn next_completed_barrier<'a>(
180 &'a mut self,
181 periodic_barriers: &mut PeriodicBarriers,
182 checkpoint_control: &mut CheckpointControl,
183 control_stream_manager: &mut ControlStreamManager,
184 context: &Arc<impl GlobalBarrierWorkerContext>,
185 env: &MetaSrvEnv,
186 ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
187 if let CompletingTask::None = self {
190 if let Some(task) = checkpoint_control
191 .next_complete_barrier_task(Some((periodic_barriers, control_stream_manager)))
192 {
193 {
194 let epochs_to_ack = task.epochs_to_ack();
195 let context = context.clone();
196 let env = env.clone();
197 let join_handle =
198 tokio::spawn(async move { task.complete_barrier(&*context, env).await });
199 *self = CompletingTask::Completing {
200 epochs_to_ack,
201 join_handle,
202 };
203 }
204 }
205 }
206
207 async move {
208 if !matches!(self, CompletingTask::Completing { .. }) {
209 return pending().await;
210 };
211 self.next_completed_barrier_inner().await
212 }
213 }
214
215 pub(super) async fn wait_completing_task(
216 &mut self,
217 ) -> MetaResult<Option<BarrierCompleteOutput>> {
218 match self {
219 CompletingTask::None => Ok(None),
220 CompletingTask::Completing { .. } => {
221 self.next_completed_barrier_inner().await.map(Some)
222 }
223 CompletingTask::Err(_) => {
224 unreachable!("should not be called on previous err")
225 }
226 }
227 }
228
229 async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
230 let CompletingTask::Completing { join_handle, .. } = self else {
231 unreachable!()
232 };
233
234 {
235 {
236 let join_result: MetaResult<_> = try {
237 join_handle
238 .await
239 .context("failed to join completing command")??
240 };
241 let next_completing_command_status = if let Err(e) = &join_result {
244 CompletingTask::Err(e.clone())
245 } else {
246 CompletingTask::None
247 };
248 let completed_command = replace(self, next_completing_command_status);
249 let hummock_version_stats = join_result?;
250
251 must_match!(completed_command, CompletingTask::Completing {
252 epochs_to_ack,
253 ..
254 } => {
255 Ok(BarrierCompleteOutput {
256 epochs_to_ack,
257 hummock_version_stats,
258 })
259 })
260 }
261 }
262 }
263}