risingwave_meta/barrier/
complete_task.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use prometheus::HistogramTimer;
23use risingwave_common::catalog::DatabaseId;
24use risingwave_common::id::JobId;
25use risingwave_common::must_match;
26use risingwave_common::util::deployment::Deployment;
27use risingwave_pb::hummock::HummockVersionStats;
28use risingwave_pb::stream_service::barrier_complete_response::{
29    PbListFinishedSource, PbLoadFinishedSource,
30};
31use tokio::task::JoinHandle;
32
33use crate::barrier::checkpoint::CheckpointControl;
34use crate::barrier::command::CommandContext;
35use crate::barrier::context::GlobalBarrierWorkerContext;
36use crate::barrier::notifier::Notifier;
37use crate::barrier::progress::TrackingJob;
38use crate::barrier::rpc::ControlStreamManager;
39use crate::barrier::schedule::PeriodicBarriers;
40use crate::hummock::CommitEpochInfo;
41use crate::manager::MetaSrvEnv;
42use crate::rpc::metrics::GLOBAL_META_METRICS;
43use crate::{MetaError, MetaResult};
44
45pub(super) enum CompletingTask {
46    None,
47    Completing {
48        #[expect(clippy::type_complexity)]
49        /// `database_id` -> (`Some(database_graph_committed_epoch)`, [(`creating_job_id`, `creating_job_committed_epoch`)])
50        epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
51
52        // The join handle of a spawned task that completes the barrier.
53        // The return value indicate whether there is some create streaming job command
54        // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier
55        join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
56    },
57    #[expect(dead_code)]
58    Err(MetaError),
59}
60
61/// Only for checkpoint barrier. For normal barrier, there won't be a task.
62#[derive(Default)]
63pub(super) struct CompleteBarrierTask {
64    pub(super) commit_info: CommitEpochInfo,
65    pub(super) finished_jobs: Vec<TrackingJob>,
66    pub(super) finished_cdc_table_backfill: Vec<JobId>,
67    pub(super) notifiers: Vec<Notifier>,
68    /// `database_id` -> (Some((`command_ctx`, `enqueue_time`)), vec!((`creating_job_id`, `epoch`)))
69    #[expect(clippy::type_complexity)]
70    pub(super) epoch_infos:
71        HashMap<DatabaseId, (Option<(CommandContext, HistogramTimer)>, Vec<(JobId, u64)>)>,
72    /// Source listing completion events that need `ListFinish` commands
73    pub(super) list_finished_source_ids: Vec<PbListFinishedSource>,
74    /// Source load completion events that need `LoadFinish` commands
75    pub(super) load_finished_source_ids: Vec<PbLoadFinishedSource>,
76    /// Table IDs that have finished materialize refresh and need completion signaling
77    pub(super) refresh_finished_table_job_ids: Vec<JobId>,
78}
79
80impl CompleteBarrierTask {
81    #[expect(clippy::type_complexity)]
82    pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)> {
83        self.epoch_infos
84            .iter()
85            .map(|(database_id, (command_context, creating_job_epochs))| {
86                (
87                    *database_id,
88                    (
89                        command_context
90                            .as_ref()
91                            .map(|(command, _)| command.barrier_info.prev_epoch.value().0),
92                        creating_job_epochs.clone(),
93                    ),
94                )
95            })
96            .collect()
97    }
98}
99
100impl CompleteBarrierTask {
101    pub(super) async fn complete_barrier(
102        self,
103        context: &impl GlobalBarrierWorkerContext,
104        env: MetaSrvEnv,
105    ) -> MetaResult<HummockVersionStats> {
106        let result: MetaResult<HummockVersionStats> = try {
107            let wait_commit_timer = GLOBAL_META_METRICS
108                .barrier_wait_commit_latency
109                .start_timer();
110            let version_stats = context.commit_epoch(self.commit_info).await?;
111
112            // Handle list finished source IDs for refreshable batch sources
113            // Spawn this asynchronously to avoid deadlock during barrier collection
114            //
115            // This step is for fs-like refreshable-batch sources, which need to list the data first finishing loading. It guarantees finishing listing before loading.
116            // The other sources can skip this step.
117
118            if !self.list_finished_source_ids.is_empty() {
119                context
120                    .handle_list_finished_source_ids(self.list_finished_source_ids.clone())
121                    .await?;
122            }
123
124            // Handle load finished source IDs for refreshable batch sources
125            // Spawn this asynchronously to avoid deadlock during barrier collection
126            if !self.load_finished_source_ids.is_empty() {
127                context
128                    .handle_load_finished_source_ids(self.load_finished_source_ids.clone())
129                    .await?;
130            }
131
132            // Handle refresh finished table IDs for materialized view refresh completion
133            if !self.refresh_finished_table_job_ids.is_empty() {
134                context
135                    .handle_refresh_finished_table_ids(self.refresh_finished_table_job_ids.clone())
136                    .await?;
137            }
138
139            for command_ctx in self
140                .epoch_infos
141                .values()
142                .flat_map(|(command, _)| command.as_ref().map(|(command, _)| command))
143            {
144                context.post_collect_command(command_ctx).await?;
145            }
146
147            wait_commit_timer.observe_duration();
148            version_stats
149        };
150
151        let version_stats = {
152            let version_stats = match result {
153                Ok(version_stats) => version_stats,
154                Err(e) => {
155                    for notifier in self.notifiers {
156                        notifier.notify_collection_failed(e.clone());
157                    }
158                    return Err(e);
159                }
160            };
161            self.notifiers.into_iter().for_each(|notifier| {
162                notifier.notify_collected();
163            });
164            try_join_all(
165                self.finished_jobs
166                    .into_iter()
167                    .map(|finished_job| context.finish_creating_job(finished_job)),
168            )
169            .await?;
170            try_join_all(
171                self.finished_cdc_table_backfill
172                    .into_iter()
173                    .map(|job_id| context.finish_cdc_table_backfill(job_id)),
174            )
175            .await?;
176            for (database_id, (command, _)) in self.epoch_infos {
177                if let Some((command_ctx, enqueue_time)) = command {
178                    let duration_sec = enqueue_time.stop_and_record();
179                    Self::report_complete_event(&env, duration_sec, &command_ctx);
180                    GLOBAL_META_METRICS
181                        .last_committed_barrier_time
182                        .with_label_values(&[database_id.to_string().as_str()])
183                        .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64);
184                }
185            }
186            version_stats
187        };
188
189        Ok(version_stats)
190    }
191}
192
193impl CompleteBarrierTask {
194    fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) {
195        // Record barrier latency in event log.
196        use risingwave_pb::meta::event_log;
197        let event = event_log::EventBarrierComplete {
198            prev_epoch: command_ctx.barrier_info.prev_epoch(),
199            cur_epoch: command_ctx.barrier_info.curr_epoch.value().0,
200            duration_sec,
201            command: command_ctx
202                .command
203                .as_ref()
204                .map(|command| command.to_string())
205                .unwrap_or_else(|| "barrier".to_owned()),
206            barrier_kind: command_ctx.barrier_info.kind.as_str_name().to_owned(),
207        };
208        if cfg!(debug_assertions) || Deployment::current().is_ci() {
209            // Add a warning log so that debug mode / CI can observe it
210            if duration_sec > 5.0 {
211                tracing::warn!(event = ?event,"high barrier latency observed!")
212            }
213        }
214        env.event_log_manager_ref()
215            .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
216    }
217}
218
219pub(super) struct BarrierCompleteOutput {
220    #[expect(clippy::type_complexity)]
221    /// `database_id` -> (`Some(database_graph_committed_epoch)`, [(`creating_job_id`, `creating_job_committed_epoch`)])
222    pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
223    pub hummock_version_stats: HummockVersionStats,
224}
225
226impl CompletingTask {
227    pub(super) fn next_completed_barrier<'a>(
228        &'a mut self,
229        periodic_barriers: &mut PeriodicBarriers,
230        checkpoint_control: &mut CheckpointControl,
231        control_stream_manager: &mut ControlStreamManager,
232        context: &Arc<impl GlobalBarrierWorkerContext>,
233        env: &MetaSrvEnv,
234    ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
235        // If there is no completing barrier, try to start completing the earliest barrier if
236        // it has been collected.
237        if let CompletingTask::None = self
238            && let Some(task) = checkpoint_control
239                .next_complete_barrier_task(Some((periodic_barriers, control_stream_manager)))
240        {
241            {
242                let epochs_to_ack = task.epochs_to_ack();
243                let context = context.clone();
244                let await_tree_reg = env.await_tree_reg().clone();
245                let env = env.clone();
246
247                let fut = async move { task.complete_barrier(&*context, env).await };
248                let fut = await_tree_reg
249                    .register_derived_root("Barrier Completion Task")
250                    .instrument(fut);
251                let join_handle = tokio::spawn(fut);
252
253                *self = CompletingTask::Completing {
254                    epochs_to_ack,
255                    join_handle,
256                };
257            }
258        }
259
260        async move {
261            if !matches!(self, CompletingTask::Completing { .. }) {
262                return pending().await;
263            };
264            self.next_completed_barrier_inner().await
265        }
266    }
267
268    #[await_tree::instrument]
269    pub(super) async fn wait_completing_task(
270        &mut self,
271    ) -> MetaResult<Option<BarrierCompleteOutput>> {
272        match self {
273            CompletingTask::None => Ok(None),
274            CompletingTask::Completing { .. } => {
275                self.next_completed_barrier_inner().await.map(Some)
276            }
277            CompletingTask::Err(_) => {
278                unreachable!("should not be called on previous err")
279            }
280        }
281    }
282
283    async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
284        let CompletingTask::Completing { join_handle, .. } = self else {
285            unreachable!()
286        };
287
288        {
289            {
290                let join_result: MetaResult<_> = try {
291                    join_handle
292                        .await
293                        .context("failed to join completing command")??
294                };
295                // It's important to reset the completing_command after await no matter the result is err
296                // or not, and otherwise the join handle will be polled again after ready.
297                let next_completing_command_status = if let Err(e) = &join_result {
298                    CompletingTask::Err(e.clone())
299                } else {
300                    CompletingTask::None
301                };
302                let completed_command = replace(self, next_completing_command_status);
303                let hummock_version_stats = join_result?;
304
305                must_match!(completed_command, CompletingTask::Completing {
306                    epochs_to_ack,
307                    ..
308                } => {
309                    Ok(BarrierCompleteOutput {
310                        epochs_to_ack,
311                        hummock_version_stats,
312                    })
313                })
314            }
315        }
316    }
317}