risingwave_meta/barrier/
complete_task.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use prometheus::HistogramTimer;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::must_match;
25use risingwave_pb::hummock::HummockVersionStats;
26use tokio::task::JoinHandle;
27
28use crate::barrier::checkpoint::CheckpointControl;
29use crate::barrier::command::CommandContext;
30use crate::barrier::context::GlobalBarrierWorkerContext;
31use crate::barrier::notifier::Notifier;
32use crate::barrier::progress::TrackingJob;
33use crate::barrier::rpc::ControlStreamManager;
34use crate::barrier::schedule::PeriodicBarriers;
35use crate::hummock::CommitEpochInfo;
36use crate::manager::MetaSrvEnv;
37use crate::rpc::metrics::GLOBAL_META_METRICS;
38use crate::{MetaError, MetaResult};
39
40pub(super) enum CompletingTask {
41    None,
42    Completing {
43        #[expect(clippy::type_complexity)]
44        /// `database_id` -> (`Some(database_graph_committed_epoch)`, [(`creating_job_id`, `creating_job_committed_epoch`)])
45        epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
46
47        // The join handle of a spawned task that completes the barrier.
48        // The return value indicate whether there is some create streaming job command
49        // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier
50        join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
51    },
52    #[expect(dead_code)]
53    Err(MetaError),
54}
55
56#[derive(Default)]
57pub(super) struct CompleteBarrierTask {
58    pub(super) commit_info: CommitEpochInfo,
59    pub(super) finished_jobs: Vec<TrackingJob>,
60    pub(super) notifiers: Vec<Notifier>,
61    /// `database_id` -> (Some((`command_ctx`, `enqueue_time`)), vec!((`creating_job_id`, `epoch`)))
62    #[expect(clippy::type_complexity)]
63    pub(super) epoch_infos: HashMap<
64        DatabaseId,
65        (
66            Option<(CommandContext, HistogramTimer)>,
67            Vec<(TableId, u64)>,
68        ),
69    >,
70}
71
72impl CompleteBarrierTask {
73    #[expect(clippy::type_complexity)]
74    pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)> {
75        self.epoch_infos
76            .iter()
77            .map(|(database_id, (command_context, creating_job_epochs))| {
78                (
79                    *database_id,
80                    (
81                        command_context
82                            .as_ref()
83                            .map(|(command, _)| command.barrier_info.prev_epoch.value().0),
84                        creating_job_epochs.clone(),
85                    ),
86                )
87            })
88            .collect()
89    }
90}
91
92impl CompleteBarrierTask {
93    pub(super) async fn complete_barrier(
94        self,
95        context: &impl GlobalBarrierWorkerContext,
96        env: MetaSrvEnv,
97    ) -> MetaResult<HummockVersionStats> {
98        let result: MetaResult<HummockVersionStats> = try {
99            let wait_commit_timer = GLOBAL_META_METRICS
100                .barrier_wait_commit_latency
101                .start_timer();
102            let version_stats = context.commit_epoch(self.commit_info).await?;
103            for command_ctx in self
104                .epoch_infos
105                .values()
106                .flat_map(|(command, _)| command.as_ref().map(|(command, _)| command))
107            {
108                context.post_collect_command(command_ctx).await?;
109            }
110
111            wait_commit_timer.observe_duration();
112            version_stats
113        };
114
115        let version_stats = {
116            let version_stats = match result {
117                Ok(version_stats) => version_stats,
118                Err(e) => {
119                    for notifier in self.notifiers {
120                        notifier.notify_collection_failed(e.clone());
121                    }
122                    return Err(e);
123                }
124            };
125            self.notifiers.into_iter().for_each(|notifier| {
126                notifier.notify_collected();
127            });
128            try_join_all(
129                self.finished_jobs
130                    .into_iter()
131                    .map(|finished_job| context.finish_creating_job(finished_job)),
132            )
133            .await?;
134            for (database_id, (command, _)) in self.epoch_infos {
135                if let Some((command_ctx, enqueue_time)) = command {
136                    let duration_sec = enqueue_time.stop_and_record();
137                    Self::report_complete_event(&env, duration_sec, &command_ctx);
138                    GLOBAL_META_METRICS
139                        .last_committed_barrier_time
140                        .with_label_values(&[database_id.database_id.to_string().as_str()])
141                        .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64);
142                }
143            }
144            version_stats
145        };
146
147        Ok(version_stats)
148    }
149}
150
151impl CompleteBarrierTask {
152    fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) {
153        // Record barrier latency in event log.
154        use risingwave_pb::meta::event_log;
155        let event = event_log::EventBarrierComplete {
156            prev_epoch: command_ctx.barrier_info.prev_epoch(),
157            cur_epoch: command_ctx.barrier_info.curr_epoch.value().0,
158            duration_sec,
159            command: command_ctx
160                .command
161                .as_ref()
162                .map(|command| command.to_string())
163                .unwrap_or_else(|| "barrier".to_owned()),
164            barrier_kind: command_ctx.barrier_info.kind.as_str_name().to_owned(),
165        };
166        env.event_log_manager_ref()
167            .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
168    }
169}
170
171pub(super) struct BarrierCompleteOutput {
172    #[expect(clippy::type_complexity)]
173    /// `database_id` -> (`Some(database_graph_committed_epoch)`, [(`creating_job_id`, `creating_job_committed_epoch`)])
174    pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
175    pub hummock_version_stats: HummockVersionStats,
176}
177
178impl CompletingTask {
179    pub(super) fn next_completed_barrier<'a>(
180        &'a mut self,
181        periodic_barriers: &mut PeriodicBarriers,
182        checkpoint_control: &mut CheckpointControl,
183        control_stream_manager: &mut ControlStreamManager,
184        context: &Arc<impl GlobalBarrierWorkerContext>,
185        env: &MetaSrvEnv,
186    ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
187        // If there is no completing barrier, try to start completing the earliest barrier if
188        // it has been collected.
189        if let CompletingTask::None = self {
190            if let Some(task) = checkpoint_control
191                .next_complete_barrier_task(Some((periodic_barriers, control_stream_manager)))
192            {
193                {
194                    let epochs_to_ack = task.epochs_to_ack();
195                    let context = context.clone();
196                    let env = env.clone();
197                    let join_handle =
198                        tokio::spawn(async move { task.complete_barrier(&*context, env).await });
199                    *self = CompletingTask::Completing {
200                        epochs_to_ack,
201                        join_handle,
202                    };
203                }
204            }
205        }
206
207        async move {
208            if !matches!(self, CompletingTask::Completing { .. }) {
209                return pending().await;
210            };
211            self.next_completed_barrier_inner().await
212        }
213    }
214
215    pub(super) async fn wait_completing_task(
216        &mut self,
217    ) -> MetaResult<Option<BarrierCompleteOutput>> {
218        match self {
219            CompletingTask::None => Ok(None),
220            CompletingTask::Completing { .. } => {
221                self.next_completed_barrier_inner().await.map(Some)
222            }
223            CompletingTask::Err(_) => {
224                unreachable!("should not be called on previous err")
225            }
226        }
227    }
228
229    async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
230        let CompletingTask::Completing { join_handle, .. } = self else {
231            unreachable!()
232        };
233
234        {
235            {
236                let join_result: MetaResult<_> = try {
237                    join_handle
238                        .await
239                        .context("failed to join completing command")??
240                };
241                // It's important to reset the completing_command after await no matter the result is err
242                // or not, and otherwise the join handle will be polled again after ready.
243                let next_completing_command_status = if let Err(e) = &join_result {
244                    CompletingTask::Err(e.clone())
245                } else {
246                    CompletingTask::None
247                };
248                let completed_command = replace(self, next_completing_command_status);
249                let hummock_version_stats = join_result?;
250
251                must_match!(completed_command, CompletingTask::Completing {
252                    epochs_to_ack,
253                    ..
254                } => {
255                    Ok(BarrierCompleteOutput {
256                        epochs_to_ack,
257                        hummock_version_stats,
258                    })
259                })
260            }
261        }
262    }
263}