risingwave_meta/barrier/
complete_task.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use prometheus::HistogramTimer;
23use risingwave_common::catalog::DatabaseId;
24use risingwave_common::id::JobId;
25use risingwave_common::must_match;
26use risingwave_common::util::deployment::Deployment;
27use risingwave_pb::hummock::HummockVersionStats;
28use risingwave_pb::stream_service::barrier_complete_response::{
29    PbListFinishedSource, PbLoadFinishedSource,
30};
31use tokio::task::JoinHandle;
32
33use crate::barrier::checkpoint::CheckpointControl;
34use crate::barrier::command::CommandContext;
35use crate::barrier::context::{CreateSnapshotBackfillJobCommandInfo, GlobalBarrierWorkerContext};
36use crate::barrier::info::BarrierInfo;
37use crate::barrier::notifier::Notifier;
38use crate::barrier::progress::TrackingJob;
39use crate::barrier::rpc::ControlStreamManager;
40use crate::barrier::schedule::PeriodicBarriers;
41use crate::hummock::CommitEpochInfo;
42use crate::manager::MetaSrvEnv;
43use crate::rpc::metrics::GLOBAL_META_METRICS;
44use crate::{MetaError, MetaResult};
45
46pub(super) enum CompletingTask {
47    None,
48    Completing {
49        #[expect(clippy::type_complexity)]
50        /// `database_id` -> (`Some(database_graph_committed_epoch)`, [(`creating_job_id`, `creating_job_committed_epoch`)])
51        epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
52
53        // The join handle of a spawned task that completes the barrier.
54        // The return value indicate whether there is some create streaming job command
55        // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier
56        join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
57    },
58    #[expect(dead_code)]
59    Err(MetaError),
60}
61
62/// Only for checkpoint barrier. For normal barrier, there won't be a task.
63#[derive(Default)]
64pub(super) struct CompleteBarrierTask {
65    pub(super) commit_info: CommitEpochInfo,
66    pub(super) finished_jobs: Vec<TrackingJob>,
67    pub(super) finished_cdc_table_backfill: Vec<JobId>,
68    pub(super) notifiers: Vec<Notifier>,
69    /// `database_id` -> (Some((`command_ctx`, `enqueue_time`)), vec!((`creating_job_id`, `epoch`, `create_snapshot_backfill_info`)))
70    #[expect(clippy::type_complexity)]
71    pub(super) epoch_infos: HashMap<
72        DatabaseId,
73        (
74            Option<(CommandContext, HistogramTimer)>,
75            Vec<(JobId, u64, Option<CreateSnapshotBackfillJobCommandInfo>)>,
76        ),
77    >,
78    /// Source listing completion events that need `ListFinish` commands
79    pub(super) list_finished_source_ids: Vec<PbListFinishedSource>,
80    /// Source load completion events that need `LoadFinish` commands
81    pub(super) load_finished_source_ids: Vec<PbLoadFinishedSource>,
82    /// Table IDs that have finished materialize refresh and need completion signaling
83    pub(super) refresh_finished_table_job_ids: Vec<JobId>,
84}
85
86impl CompleteBarrierTask {
87    #[expect(clippy::type_complexity)]
88    pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)> {
89        self.epoch_infos
90            .iter()
91            .map(|(database_id, (command_context, creating_job_epochs))| {
92                (
93                    *database_id,
94                    (
95                        command_context
96                            .as_ref()
97                            .map(|(command, _)| command.barrier_info.prev_epoch()),
98                        creating_job_epochs
99                            .iter()
100                            .map(|(job_id, epoch, _)| (*job_id, *epoch))
101                            .collect(),
102                    ),
103                )
104            })
105            .collect()
106    }
107}
108
109impl CompleteBarrierTask {
110    pub(super) async fn complete_barrier(
111        self,
112        context: &impl GlobalBarrierWorkerContext,
113        env: MetaSrvEnv,
114    ) -> MetaResult<HummockVersionStats> {
115        let result: MetaResult<HummockVersionStats> = try {
116            let wait_commit_timer = GLOBAL_META_METRICS
117                .barrier_wait_commit_latency
118                .start_timer();
119            let version_stats = context.commit_epoch(self.commit_info).await?;
120
121            // Handle list finished source IDs for refreshable batch sources
122            // Spawn this asynchronously to avoid deadlock during barrier collection
123            //
124            // This step is for fs-like refreshable-batch sources, which need to list the data first finishing loading. It guarantees finishing listing before loading.
125            // The other sources can skip this step.
126
127            if !self.list_finished_source_ids.is_empty() {
128                context
129                    .handle_list_finished_source_ids(self.list_finished_source_ids.clone())
130                    .await?;
131            }
132
133            // Handle load finished source IDs for refreshable batch sources
134            // Spawn this asynchronously to avoid deadlock during barrier collection
135            if !self.load_finished_source_ids.is_empty() {
136                context
137                    .handle_load_finished_source_ids(self.load_finished_source_ids.clone())
138                    .await?;
139            }
140
141            // Handle refresh finished table IDs for materialized view refresh completion
142            if !self.refresh_finished_table_job_ids.is_empty() {
143                context
144                    .handle_refresh_finished_table_ids(self.refresh_finished_table_job_ids.clone())
145                    .await?;
146            }
147
148            for (database_id, (command_ctx, creating_jobs)) in self.epoch_infos {
149                if let Some((command_ctx, enqueue_time)) = command_ctx {
150                    let command_name = command_ctx.command.command_name().to_owned();
151                    context.post_collect_command(command_ctx.command).await?;
152                    let duration_sec = enqueue_time.stop_and_record();
153                    Self::report_complete_event(
154                        &env,
155                        duration_sec,
156                        &command_ctx.barrier_info,
157                        command_name,
158                    );
159                    GLOBAL_META_METRICS
160                        .last_committed_barrier_time
161                        .with_label_values(&[database_id.to_string().as_str()])
162                        .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64);
163                }
164                for (_, _, create_info) in creating_jobs {
165                    if let Some(create_info) = create_info {
166                        context
167                            .post_collect_command(create_info.into_post_collect())
168                            .await?;
169                    }
170                }
171            }
172
173            wait_commit_timer.observe_duration();
174            version_stats
175        };
176
177        let version_stats = {
178            let version_stats = match result {
179                Ok(version_stats) => version_stats,
180                Err(e) => {
181                    for notifier in self.notifiers {
182                        notifier.notify_collection_failed(e.clone());
183                    }
184                    return Err(e);
185                }
186            };
187            self.notifiers.into_iter().for_each(|notifier| {
188                notifier.notify_collected();
189            });
190            try_join_all(
191                self.finished_jobs
192                    .into_iter()
193                    .map(|finished_job| context.finish_creating_job(finished_job)),
194            )
195            .await?;
196            try_join_all(
197                self.finished_cdc_table_backfill
198                    .into_iter()
199                    .map(|job_id| context.finish_cdc_table_backfill(job_id)),
200            )
201            .await?;
202            version_stats
203        };
204
205        Ok(version_stats)
206    }
207}
208
209impl CompleteBarrierTask {
210    fn report_complete_event(
211        env: &MetaSrvEnv,
212        duration_sec: f64,
213        barrier_info: &BarrierInfo,
214        command: String,
215    ) {
216        // Record barrier latency in event log.
217        use risingwave_pb::meta::event_log;
218        let event = event_log::EventBarrierComplete {
219            prev_epoch: barrier_info.prev_epoch(),
220            cur_epoch: barrier_info.curr_epoch(),
221            duration_sec,
222            command,
223            barrier_kind: barrier_info.kind.as_str_name().to_owned(),
224        };
225        if cfg!(debug_assertions) || Deployment::current().is_ci() {
226            // Add a warning log so that debug mode / CI can observe it
227            if duration_sec > 5.0 {
228                tracing::warn!(event = ?event,"high barrier latency observed!")
229            }
230        }
231        env.event_log_manager_ref()
232            .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
233    }
234}
235
236pub(super) struct BarrierCompleteOutput {
237    #[expect(clippy::type_complexity)]
238    /// `database_id` -> (`Some(database_graph_committed_epoch)`, [(`creating_job_id`, `creating_job_committed_epoch`)])
239    pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
240    pub hummock_version_stats: HummockVersionStats,
241}
242
243impl CompletingTask {
244    pub(super) fn next_completed_barrier<'a>(
245        &'a mut self,
246        periodic_barriers: &mut PeriodicBarriers,
247        checkpoint_control: &mut CheckpointControl,
248        control_stream_manager: &mut ControlStreamManager,
249        context: &Arc<impl GlobalBarrierWorkerContext>,
250        env: &MetaSrvEnv,
251    ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
252        // If there is no completing barrier, try to start completing the earliest barrier if
253        // it has been collected.
254        if let CompletingTask::None = self
255            && let Some(task) = checkpoint_control
256                .next_complete_barrier_task(Some((periodic_barriers, control_stream_manager)))
257        {
258            {
259                let epochs_to_ack = task.epochs_to_ack();
260                let context = context.clone();
261                let await_tree_reg = env.await_tree_reg().clone();
262                let env = env.clone();
263
264                let fut = async move { task.complete_barrier(&*context, env).await };
265                let fut = await_tree_reg
266                    .register_derived_root("Barrier Completion Task")
267                    .instrument(fut);
268                let join_handle = tokio::spawn(fut);
269
270                *self = CompletingTask::Completing {
271                    epochs_to_ack,
272                    join_handle,
273                };
274            }
275        }
276
277        async move {
278            if !matches!(self, CompletingTask::Completing { .. }) {
279                return pending().await;
280            };
281            self.next_completed_barrier_inner().await
282        }
283    }
284
285    #[await_tree::instrument]
286    pub(super) async fn wait_completing_task(
287        &mut self,
288    ) -> MetaResult<Option<BarrierCompleteOutput>> {
289        match self {
290            CompletingTask::None => Ok(None),
291            CompletingTask::Completing { .. } => {
292                self.next_completed_barrier_inner().await.map(Some)
293            }
294            CompletingTask::Err(_) => {
295                unreachable!("should not be called on previous err")
296            }
297        }
298    }
299
300    async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
301        let CompletingTask::Completing { join_handle, .. } = self else {
302            unreachable!()
303        };
304
305        {
306            {
307                let join_result: MetaResult<_> = try {
308                    join_handle
309                        .await
310                        .context("failed to join completing command")??
311                };
312                // It's important to reset the completing_command after await no matter the result is err
313                // or not, and otherwise the join handle will be polled again after ready.
314                let next_completing_command_status = if let Err(e) = &join_result {
315                    CompletingTask::Err(e.clone())
316                } else {
317                    CompletingTask::None
318                };
319                let completed_command = replace(self, next_completing_command_status);
320                let hummock_version_stats = join_result?;
321
322                must_match!(completed_command, CompletingTask::Completing {
323                    epochs_to_ack,
324                    ..
325                } => {
326                    Ok(BarrierCompleteOutput {
327                        epochs_to_ack,
328                        hummock_version_stats,
329                    })
330                })
331            }
332        }
333    }
334}