risingwave_meta/barrier/
complete_task.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use prometheus::HistogramTimer;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::must_match;
25use risingwave_common::util::deployment::Deployment;
26use risingwave_pb::hummock::HummockVersionStats;
27use tokio::task::JoinHandle;
28
29use crate::barrier::checkpoint::CheckpointControl;
30use crate::barrier::command::CommandContext;
31use crate::barrier::context::GlobalBarrierWorkerContext;
32use crate::barrier::notifier::Notifier;
33use crate::barrier::progress::TrackingJob;
34use crate::barrier::rpc::ControlStreamManager;
35use crate::barrier::schedule::PeriodicBarriers;
36use crate::hummock::CommitEpochInfo;
37use crate::manager::MetaSrvEnv;
38use crate::rpc::metrics::GLOBAL_META_METRICS;
39use crate::{MetaError, MetaResult};
40
41pub(super) enum CompletingTask {
42    None,
43    Completing {
44        #[expect(clippy::type_complexity)]
45        /// `database_id` -> (`Some(database_graph_committed_epoch)`, [(`creating_job_id`, `creating_job_committed_epoch`)])
46        epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
47
48        // The join handle of a spawned task that completes the barrier.
49        // The return value indicate whether there is some create streaming job command
50        // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier
51        join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
52    },
53    #[expect(dead_code)]
54    Err(MetaError),
55}
56
57/// Only for checkpoint barrier. For normal barrier, there won't be a task.
58#[derive(Default)]
59pub(super) struct CompleteBarrierTask {
60    pub(super) commit_info: CommitEpochInfo,
61    pub(super) finished_jobs: Vec<TrackingJob>,
62    pub(super) finished_cdc_table_backfill: Vec<TableId>,
63    pub(super) notifiers: Vec<Notifier>,
64    /// `database_id` -> (Some((`command_ctx`, `enqueue_time`)), vec!((`creating_job_id`, `epoch`)))
65    #[expect(clippy::type_complexity)]
66    pub(super) epoch_infos: HashMap<
67        DatabaseId,
68        (
69            Option<(CommandContext, HistogramTimer)>,
70            Vec<(TableId, u64)>,
71        ),
72    >,
73    /// Source IDs that have finished loading data and need `LoadFinish` commands
74    pub(super) load_finished_source_ids: Vec<u32>,
75    /// Table IDs that have finished materialize refresh and need completion signaling
76    pub(super) refresh_finished_table_ids: Vec<u32>,
77}
78
79impl CompleteBarrierTask {
80    #[expect(clippy::type_complexity)]
81    pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)> {
82        self.epoch_infos
83            .iter()
84            .map(|(database_id, (command_context, creating_job_epochs))| {
85                (
86                    *database_id,
87                    (
88                        command_context
89                            .as_ref()
90                            .map(|(command, _)| command.barrier_info.prev_epoch.value().0),
91                        creating_job_epochs.clone(),
92                    ),
93                )
94            })
95            .collect()
96    }
97}
98
99impl CompleteBarrierTask {
100    pub(super) async fn complete_barrier(
101        self,
102        context: &impl GlobalBarrierWorkerContext,
103        env: MetaSrvEnv,
104    ) -> MetaResult<HummockVersionStats> {
105        let result: MetaResult<HummockVersionStats> = try {
106            let wait_commit_timer = GLOBAL_META_METRICS
107                .barrier_wait_commit_latency
108                .start_timer();
109            let version_stats = context.commit_epoch(self.commit_info).await?;
110
111            // Handle load finished source IDs for refreshable batch sources
112            // Spawn this asynchronously to avoid deadlock during barrier collection
113            if !self.load_finished_source_ids.is_empty() {
114                context
115                    .handle_load_finished_source_ids(self.load_finished_source_ids.clone())
116                    .await?;
117            }
118
119            // Handle refresh finished table IDs for materialized view refresh completion
120            if !self.refresh_finished_table_ids.is_empty() {
121                context
122                    .handle_refresh_finished_table_ids(self.refresh_finished_table_ids.clone())
123                    .await?;
124            }
125
126            for command_ctx in self
127                .epoch_infos
128                .values()
129                .flat_map(|(command, _)| command.as_ref().map(|(command, _)| command))
130            {
131                context.post_collect_command(command_ctx).await?;
132            }
133
134            wait_commit_timer.observe_duration();
135            version_stats
136        };
137
138        let version_stats = {
139            let version_stats = match result {
140                Ok(version_stats) => version_stats,
141                Err(e) => {
142                    for notifier in self.notifiers {
143                        notifier.notify_collection_failed(e.clone());
144                    }
145                    return Err(e);
146                }
147            };
148            self.notifiers.into_iter().for_each(|notifier| {
149                notifier.notify_collected();
150            });
151            try_join_all(
152                self.finished_jobs
153                    .into_iter()
154                    .map(|finished_job| context.finish_creating_job(finished_job)),
155            )
156            .await?;
157            try_join_all(
158                self.finished_cdc_table_backfill
159                    .into_iter()
160                    .map(|job_id| context.finish_cdc_table_backfill(job_id)),
161            )
162            .await?;
163            for (database_id, (command, _)) in self.epoch_infos {
164                if let Some((command_ctx, enqueue_time)) = command {
165                    let duration_sec = enqueue_time.stop_and_record();
166                    Self::report_complete_event(&env, duration_sec, &command_ctx);
167                    GLOBAL_META_METRICS
168                        .last_committed_barrier_time
169                        .with_label_values(&[database_id.database_id.to_string().as_str()])
170                        .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64);
171                }
172            }
173            version_stats
174        };
175
176        Ok(version_stats)
177    }
178}
179
180impl CompleteBarrierTask {
181    fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) {
182        // Record barrier latency in event log.
183        use risingwave_pb::meta::event_log;
184        let event = event_log::EventBarrierComplete {
185            prev_epoch: command_ctx.barrier_info.prev_epoch(),
186            cur_epoch: command_ctx.barrier_info.curr_epoch.value().0,
187            duration_sec,
188            command: command_ctx
189                .command
190                .as_ref()
191                .map(|command| command.to_string())
192                .unwrap_or_else(|| "barrier".to_owned()),
193            barrier_kind: command_ctx.barrier_info.kind.as_str_name().to_owned(),
194        };
195        if cfg!(debug_assertions) || Deployment::current().is_ci() {
196            // Add a warning log so that debug mode / CI can observe it
197            if duration_sec > 5.0 {
198                tracing::warn!(event = ?event,"high barrier latency observed!")
199            }
200        }
201        env.event_log_manager_ref()
202            .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
203    }
204}
205
206pub(super) struct BarrierCompleteOutput {
207    #[expect(clippy::type_complexity)]
208    /// `database_id` -> (`Some(database_graph_committed_epoch)`, [(`creating_job_id`, `creating_job_committed_epoch`)])
209    pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
210    pub hummock_version_stats: HummockVersionStats,
211}
212
213impl CompletingTask {
214    pub(super) fn next_completed_barrier<'a>(
215        &'a mut self,
216        periodic_barriers: &mut PeriodicBarriers,
217        checkpoint_control: &mut CheckpointControl,
218        control_stream_manager: &mut ControlStreamManager,
219        context: &Arc<impl GlobalBarrierWorkerContext>,
220        env: &MetaSrvEnv,
221    ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
222        // If there is no completing barrier, try to start completing the earliest barrier if
223        // it has been collected.
224        if let CompletingTask::None = self
225            && let Some(task) = checkpoint_control
226                .next_complete_barrier_task(Some((periodic_barriers, control_stream_manager)))
227        {
228            {
229                let epochs_to_ack = task.epochs_to_ack();
230                let context = context.clone();
231                let await_tree_reg = env.await_tree_reg().clone();
232                let env = env.clone();
233
234                let fut = async move { task.complete_barrier(&*context, env).await };
235                let fut = await_tree_reg
236                    .register_derived_root("Barrier Completion Task")
237                    .instrument(fut);
238                let join_handle = tokio::spawn(fut);
239
240                *self = CompletingTask::Completing {
241                    epochs_to_ack,
242                    join_handle,
243                };
244            }
245        }
246
247        async move {
248            if !matches!(self, CompletingTask::Completing { .. }) {
249                return pending().await;
250            };
251            self.next_completed_barrier_inner().await
252        }
253    }
254
255    #[await_tree::instrument]
256    pub(super) async fn wait_completing_task(
257        &mut self,
258    ) -> MetaResult<Option<BarrierCompleteOutput>> {
259        match self {
260            CompletingTask::None => Ok(None),
261            CompletingTask::Completing { .. } => {
262                self.next_completed_barrier_inner().await.map(Some)
263            }
264            CompletingTask::Err(_) => {
265                unreachable!("should not be called on previous err")
266            }
267        }
268    }
269
270    async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
271        let CompletingTask::Completing { join_handle, .. } = self else {
272            unreachable!()
273        };
274
275        {
276            {
277                let join_result: MetaResult<_> = try {
278                    join_handle
279                        .await
280                        .context("failed to join completing command")??
281                };
282                // It's important to reset the completing_command after await no matter the result is err
283                // or not, and otherwise the join handle will be polled again after ready.
284                let next_completing_command_status = if let Err(e) = &join_result {
285                    CompletingTask::Err(e.clone())
286                } else {
287                    CompletingTask::None
288                };
289                let completed_command = replace(self, next_completing_command_status);
290                let hummock_version_stats = join_result?;
291
292                must_match!(completed_command, CompletingTask::Completing {
293                    epochs_to_ack,
294                    ..
295                } => {
296                    Ok(BarrierCompleteOutput {
297                        epochs_to_ack,
298                        hummock_version_stats,
299                    })
300                })
301            }
302        }
303    }
304}