risingwave_meta/barrier/
complete_task.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::{Future, pending};
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::Context;
21use futures::future::try_join_all;
22use risingwave_common::id::JobId;
23use risingwave_common::must_match;
24use risingwave_common::util::deployment::Deployment;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::id::{DatabaseId, PartialGraphId};
27use risingwave_pb::stream_service::barrier_complete_response::{
28    PbListFinishedSource, PbLoadFinishedSource,
29};
30use tokio::task::JoinHandle;
31
32use crate::barrier::checkpoint::CheckpointControl;
33use crate::barrier::context::GlobalBarrierWorkerContext;
34use crate::barrier::info::BarrierInfo;
35use crate::barrier::partial_graph::{PartialGraphBarrierInfo, PartialGraphManager};
36use crate::barrier::progress::TrackingJob;
37use crate::barrier::rpc::from_partial_graph_id;
38use crate::barrier::schedule::PeriodicBarriers;
39use crate::hummock::CommitEpochInfo;
40use crate::manager::MetaSrvEnv;
41use crate::rpc::metrics::GLOBAL_META_METRICS;
42use crate::{MetaError, MetaResult};
43
44pub(super) enum CompletingTask {
45    None,
46    Completing {
47        #[expect(clippy::type_complexity)]
48        /// `database_id` -> (`Some(database_graph_committed_epoch)`, [(`creating_job_id`, `creating_job_committed_epoch`)])
49        epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
50
51        // The join handle of a spawned task that completes the barrier.
52        // The return value indicate whether there is some create streaming job command
53        // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier
54        join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
55    },
56    #[expect(dead_code)]
57    Err(MetaError),
58}
59
60/// Only for checkpoint barrier. For normal barrier, there won't be a task.
61#[derive(Default)]
62pub(super) struct CompleteBarrierTask {
63    pub(super) commit_info: CommitEpochInfo,
64    pub(super) finished_jobs: Vec<TrackingJob>,
65    pub(super) finished_cdc_table_backfill: Vec<JobId>,
66    /// `partial_graph_id` -> barrier info for post-collect processing
67    pub(super) epoch_infos: HashMap<PartialGraphId, PartialGraphBarrierInfo>,
68    /// Source listing completion events that need `ListFinish` commands
69    pub(super) list_finished_source_ids: Vec<PbListFinishedSource>,
70    /// Source load completion events that need `LoadFinish` commands
71    pub(super) load_finished_source_ids: Vec<PbLoadFinishedSource>,
72    /// Table IDs that have finished materialize refresh and need completion signaling
73    pub(super) refresh_finished_table_job_ids: Vec<JobId>,
74}
75
76impl CompleteBarrierTask {
77    #[expect(clippy::type_complexity)]
78    pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)> {
79        let mut epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)> =
80            HashMap::new();
81        for (partial_graph_id, info) in &self.epoch_infos {
82            let (database_id, creating_job_id) = from_partial_graph_id(*partial_graph_id);
83            let epoch = info.barrier_info.prev_epoch();
84            let (database, jobs) = epochs_to_ack.entry(database_id).or_default();
85            if let Some(job_id) = creating_job_id {
86                jobs.push((job_id, epoch));
87            } else {
88                *database = Some(epoch);
89            }
90        }
91        epochs_to_ack
92    }
93}
94
95impl CompleteBarrierTask {
96    pub(super) async fn complete_barrier(
97        self,
98        context: &impl GlobalBarrierWorkerContext,
99        env: MetaSrvEnv,
100    ) -> MetaResult<HummockVersionStats> {
101        let mut notifiers = Vec::new();
102        let result: MetaResult<HummockVersionStats> = try {
103            let wait_commit_timer = GLOBAL_META_METRICS
104                .barrier_wait_commit_latency
105                .start_timer();
106            let version_stats = context.commit_epoch(self.commit_info).await?;
107
108            // Handle list finished source IDs for refreshable batch sources
109            // Spawn this asynchronously to avoid deadlock during barrier collection
110            //
111            // This step is for fs-like refreshable-batch sources, which need to list the data first finishing loading. It guarantees finishing listing before loading.
112            // The other sources can skip this step.
113
114            if !self.list_finished_source_ids.is_empty() {
115                context
116                    .handle_list_finished_source_ids(self.list_finished_source_ids.clone())
117                    .await?;
118            }
119
120            // Handle load finished source IDs for refreshable batch sources
121            // Spawn this asynchronously to avoid deadlock during barrier collection
122            if !self.load_finished_source_ids.is_empty() {
123                context
124                    .handle_load_finished_source_ids(self.load_finished_source_ids.clone())
125                    .await?;
126            }
127
128            // Handle refresh finished table IDs for materialized view refresh completion
129            if !self.refresh_finished_table_job_ids.is_empty() {
130                context
131                    .handle_refresh_finished_table_ids(self.refresh_finished_table_job_ids.clone())
132                    .await?;
133            }
134
135            for (partial_graph_id, info) in self.epoch_infos {
136                let (database_id, job_id) = from_partial_graph_id(partial_graph_id);
137                let command_name = info.post_collect_command.command_name().to_owned();
138                let elapsed_secs = info.elapsed_secs();
139                notifiers.extend(info.notifiers);
140                context
141                    .post_collect_command(info.post_collect_command)
142                    .await?;
143                if job_id.is_none() {
144                    Self::report_complete_event(
145                        &env,
146                        database_id,
147                        elapsed_secs,
148                        &info.barrier_info,
149                        command_name,
150                    );
151                    GLOBAL_META_METRICS
152                        .last_committed_barrier_time
153                        .with_label_values(&[database_id.to_string().as_str()])
154                        .set(info.barrier_info.curr_epoch.value().as_unix_secs() as i64);
155                }
156            }
157
158            wait_commit_timer.observe_duration();
159            version_stats
160        };
161
162        let version_stats = {
163            let version_stats = match result {
164                Ok(version_stats) => version_stats,
165                Err(e) => {
166                    for notifier in notifiers {
167                        notifier.notify_collection_failed(e.clone());
168                    }
169                    return Err(e);
170                }
171            };
172            notifiers.into_iter().for_each(|notifier| {
173                notifier.notify_collected();
174            });
175            try_join_all(
176                self.finished_jobs
177                    .into_iter()
178                    .map(|finished_job| context.finish_creating_job(finished_job)),
179            )
180            .await?;
181            try_join_all(
182                self.finished_cdc_table_backfill
183                    .into_iter()
184                    .map(|job_id| context.finish_cdc_table_backfill(job_id)),
185            )
186            .await?;
187            version_stats
188        };
189
190        Ok(version_stats)
191    }
192}
193
194impl CompleteBarrierTask {
195    fn report_complete_event(
196        env: &MetaSrvEnv,
197        database_id: DatabaseId,
198        duration_sec: f64,
199        barrier_info: &BarrierInfo,
200        command: String,
201    ) {
202        // Record barrier latency in event log.
203        use risingwave_pb::meta::event_log;
204        let event = event_log::EventBarrierComplete {
205            prev_epoch: barrier_info.prev_epoch(),
206            cur_epoch: barrier_info.curr_epoch(),
207            duration_sec,
208            command,
209            barrier_kind: barrier_info.kind.as_str_name().to_owned(),
210            database_id: database_id.as_raw_id(),
211        };
212        if cfg!(debug_assertions) || Deployment::current().is_ci() {
213            // Add a warning log so that debug mode / CI can observe it
214            if duration_sec > 5.0 {
215                tracing::warn!(event = ?event,"high barrier latency observed!")
216            }
217        }
218        env.event_log_manager_ref()
219            .add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
220    }
221}
222
223pub(super) struct BarrierCompleteOutput {
224    #[expect(clippy::type_complexity)]
225    /// `database_id` -> (`Some(database_graph_committed_epoch)`, [(`creating_job_id`, `creating_job_committed_epoch`)])
226    pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(JobId, u64)>)>,
227    pub hummock_version_stats: HummockVersionStats,
228}
229
230impl CompletingTask {
231    pub(super) fn next_completed_barrier<'a>(
232        &'a mut self,
233        periodic_barriers: &mut PeriodicBarriers,
234        checkpoint_control: &mut CheckpointControl,
235        partial_graph_manager: &mut PartialGraphManager,
236        context: &Arc<impl GlobalBarrierWorkerContext>,
237        env: &MetaSrvEnv,
238    ) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
239        // If there is no completing barrier, try to start completing the earliest barrier if
240        // it has been collected.
241        if let CompletingTask::None = self
242            && let Some(task) = checkpoint_control
243                .next_complete_barrier_task(periodic_barriers, partial_graph_manager)
244        {
245            {
246                let epochs_to_ack = task.epochs_to_ack();
247                let context = context.clone();
248                let await_tree_reg = env.await_tree_reg().clone();
249                let env = env.clone();
250
251                let fut = async move { task.complete_barrier(&*context, env).await };
252                let fut = await_tree_reg
253                    .register_derived_root("Barrier Completion Task")
254                    .instrument(fut);
255                let join_handle = tokio::spawn(fut);
256
257                *self = CompletingTask::Completing {
258                    epochs_to_ack,
259                    join_handle,
260                };
261            }
262        }
263
264        async move {
265            if !matches!(self, CompletingTask::Completing { .. }) {
266                return pending().await;
267            };
268            self.next_completed_barrier_inner().await
269        }
270    }
271
272    #[await_tree::instrument]
273    pub(super) async fn wait_completing_task(
274        &mut self,
275    ) -> MetaResult<Option<BarrierCompleteOutput>> {
276        match self {
277            CompletingTask::None => Ok(None),
278            CompletingTask::Completing { .. } => {
279                self.next_completed_barrier_inner().await.map(Some)
280            }
281            CompletingTask::Err(_) => {
282                unreachable!("should not be called on previous err")
283            }
284        }
285    }
286
287    async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
288        let CompletingTask::Completing { join_handle, .. } = self else {
289            unreachable!()
290        };
291
292        {
293            {
294                let join_result: MetaResult<_> = try {
295                    join_handle
296                        .await
297                        .context("failed to join completing command")??
298                };
299                // It's important to reset the completing_command after await no matter the result is err
300                // or not, and otherwise the join handle will be polled again after ready.
301                let next_completing_command_status = if let Err(e) = &join_result {
302                    CompletingTask::Err(e.clone())
303                } else {
304                    CompletingTask::None
305                };
306                let completed_command = replace(self, next_completing_command_status);
307                let hummock_version_stats = join_result?;
308
309                must_match!(completed_command, CompletingTask::Completing {
310                    epochs_to_ack,
311                    ..
312                } => {
313                    Ok(BarrierCompleteOutput {
314                        epochs_to_ack,
315                        hummock_version_stats,
316                    })
317                })
318            }
319        }
320    }
321}