risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_service::BarrierCompleteResponse;
31use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
32use tracing::{debug, warn};
33
34use crate::barrier::cdc_progress::CdcProgress;
35use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
36use crate::barrier::checkpoint::recovery::{
37    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
38    RecoveringStateAction,
39};
40use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
41use crate::barrier::command::CommandContext;
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
43use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
44use crate::barrier::notifier::Notifier;
45use crate::barrier::progress::TrackingJob;
46use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
47use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
48use crate::barrier::utils::{
49    NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
50};
51use crate::barrier::{Command, CreateStreamingJobType};
52use crate::manager::MetaSrvEnv;
53use crate::rpc::metrics::GLOBAL_META_METRICS;
54use crate::{MetaError, MetaResult};
55
56pub(crate) struct CheckpointControl {
57    pub(crate) env: MetaSrvEnv,
58    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
59    pub(super) hummock_version_stats: HummockVersionStats,
60    /// The max barrier nums in flight
61    pub(crate) in_flight_barrier_nums: usize,
62}
63
64impl CheckpointControl {
65    pub fn new(env: MetaSrvEnv) -> Self {
66        Self {
67            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
68            env,
69            databases: Default::default(),
70            hummock_version_stats: Default::default(),
71        }
72    }
73
74    pub(crate) fn recover(
75        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
76        failed_databases: HashSet<DatabaseId>,
77        control_stream_manager: &mut ControlStreamManager,
78        hummock_version_stats: HummockVersionStats,
79        env: MetaSrvEnv,
80    ) -> Self {
81        env.shared_actor_infos()
82            .retain_databases(databases.keys().chain(&failed_databases).cloned());
83        Self {
84            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
85            env,
86            databases: databases
87                .into_iter()
88                .map(|(database_id, control)| {
89                    (
90                        database_id,
91                        DatabaseCheckpointControlStatus::Running(control),
92                    )
93                })
94                .chain(failed_databases.into_iter().map(|database_id| {
95                    (
96                        database_id,
97                        DatabaseCheckpointControlStatus::Recovering(
98                            DatabaseRecoveringState::resetting(database_id, control_stream_manager),
99                        ),
100                    )
101                }))
102                .collect(),
103            hummock_version_stats,
104        }
105    }
106
107    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
108        self.hummock_version_stats = output.hummock_version_stats;
109        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
110            self.databases
111                .get_mut(&database_id)
112                .expect("should exist")
113                .expect_running("should have wait for completing command before enter recovery")
114                .ack_completed(command_prev_epoch, creating_job_epochs);
115        }
116    }
117
118    pub(crate) fn next_complete_barrier_task(
119        &mut self,
120        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
121    ) -> Option<CompleteBarrierTask> {
122        let mut task = None;
123        for database in self.databases.values_mut() {
124            let Some(database) = database.running_state_mut() else {
125                continue;
126            };
127            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
128            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
129        }
130        task
131    }
132
133    pub(crate) fn barrier_collected(
134        &mut self,
135        resp: BarrierCompleteResponse,
136        periodic_barriers: &mut PeriodicBarriers,
137    ) -> MetaResult<()> {
138        let database_id = resp.database_id;
139        let database_status = self.databases.get_mut(&database_id).expect("should exist");
140        match database_status {
141            DatabaseCheckpointControlStatus::Running(database) => {
142                database.barrier_collected(resp, periodic_barriers)
143            }
144            DatabaseCheckpointControlStatus::Recovering(state) => {
145                state.barrier_collected(database_id, resp);
146                Ok(())
147            }
148        }
149    }
150
151    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
152        self.databases.iter().filter_map(|(database_id, database)| {
153            database.running_state().is_none().then_some(*database_id)
154        })
155    }
156
157    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
158        self.databases.iter().filter_map(|(database_id, database)| {
159            database.running_state().is_some().then_some(*database_id)
160        })
161    }
162
163    /// return Some(failed `database_id` -> `err`)
164    pub(crate) fn handle_new_barrier(
165        &mut self,
166        new_barrier: NewBarrier,
167        control_stream_manager: &mut ControlStreamManager,
168    ) -> MetaResult<()> {
169        let NewBarrier {
170            database_id,
171            command,
172            span,
173            checkpoint,
174        } = new_barrier;
175
176        if let Some((mut command, notifiers)) = command {
177            if let &mut Command::CreateStreamingJob {
178                ref mut cross_db_snapshot_backfill_info,
179                ref info,
180                ..
181            } = &mut command
182            {
183                for (table_id, snapshot_epoch) in
184                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
185                {
186                    for database in self.databases.values() {
187                        if let Some(database) = database.running_state()
188                            && database.database_info.contains_job(table_id.as_job_id())
189                        {
190                            if let Some(committed_epoch) = database.committed_epoch {
191                                *snapshot_epoch = Some(committed_epoch);
192                            }
193                            break;
194                        }
195                    }
196                    if snapshot_epoch.is_none() {
197                        let table_id = *table_id;
198                        warn!(
199                            ?cross_db_snapshot_backfill_info,
200                            ?table_id,
201                            ?info,
202                            "database of cross db upstream table not found"
203                        );
204                        let err: MetaError =
205                            anyhow!("database of cross db upstream table {} not found", table_id)
206                                .into();
207                        for notifier in notifiers {
208                            notifier.notify_start_failed(err.clone());
209                        }
210
211                        return Ok(());
212                    }
213                }
214            }
215
216            let database = match self.databases.entry(database_id) {
217                Entry::Occupied(entry) => entry
218                    .into_mut()
219                    .expect_running("should not have command when not running"),
220                Entry::Vacant(entry) => match &command {
221                    Command::CreateStreamingJob {
222                        job_type: CreateStreamingJobType::Normal,
223                        ..
224                    } => {
225                        let new_database = DatabaseCheckpointControl::new(
226                            database_id,
227                            self.env.shared_actor_infos().clone(),
228                        );
229                        control_stream_manager.add_partial_graph(database_id, None);
230                        entry
231                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
232                            .expect_running("just initialized as running")
233                    }
234                    Command::Flush | Command::Pause | Command::Resume => {
235                        for mut notifier in notifiers {
236                            notifier.notify_started();
237                            notifier.notify_collected();
238                        }
239                        warn!(?command, "skip command for empty database");
240                        return Ok(());
241                    }
242                    _ => {
243                        panic!(
244                            "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
245                            database_id, command
246                        )
247                    }
248                },
249            };
250
251            database.handle_new_barrier(
252                Some((command, notifiers)),
253                checkpoint,
254                span,
255                control_stream_manager,
256                &self.hummock_version_stats,
257            )
258        } else {
259            let database = match self.databases.entry(database_id) {
260                Entry::Occupied(entry) => entry.into_mut(),
261                Entry::Vacant(_) => {
262                    // If it does not exist in the HashMap yet, it means that the first streaming
263                    // job has not been created, and we do not need to send a barrier.
264                    return Ok(());
265                }
266            };
267            let Some(database) = database.running_state_mut() else {
268                // Skip new barrier for database which is not running.
269                return Ok(());
270            };
271            if !database.can_inject_barrier(self.in_flight_barrier_nums) {
272                // Skip new barrier with no explicit command when the database should pause inject additional barrier
273                return Ok(());
274            }
275            database.handle_new_barrier(
276                None,
277                checkpoint,
278                span,
279                control_stream_manager,
280                &self.hummock_version_stats,
281            )
282        }
283    }
284
285    pub(crate) fn update_barrier_nums_metrics(&self) {
286        self.databases
287            .values()
288            .flat_map(|database| database.running_state())
289            .for_each(|database| database.update_barrier_nums_metrics());
290    }
291
292    pub(crate) fn gen_ddl_progress(&self) -> HashMap<JobId, DdlProgress> {
293        let mut progress = HashMap::new();
294        for status in self.databases.values() {
295            let Some(database_checkpoint_control) = status.running_state() else {
296                continue;
297            };
298            // Progress of normal backfill
299            progress.extend(database_checkpoint_control.database_info.gen_ddl_progress());
300            // Progress of snapshot backfill
301            for creating_job in database_checkpoint_control
302                .creating_streaming_job_controls
303                .values()
304            {
305                progress.extend([(creating_job.job_id, creating_job.gen_ddl_progress())]);
306            }
307        }
308        progress
309    }
310
311    pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
312        let mut progress = HashMap::new();
313        for status in self.databases.values() {
314            let Some(database_checkpoint_control) = status.running_state() else {
315                continue;
316            };
317            // Progress of normal backfill
318            progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
319        }
320        progress
321    }
322
323    pub(crate) fn databases_failed_at_worker_err(
324        &mut self,
325        worker_id: WorkerId,
326    ) -> Vec<DatabaseId> {
327        let mut failed_databases = Vec::new();
328        for (database_id, database_status) in &mut self.databases {
329            let database_checkpoint_control = match database_status {
330                DatabaseCheckpointControlStatus::Running(control) => control,
331                DatabaseCheckpointControlStatus::Recovering(state) => {
332                    if !state.is_valid_after_worker_err(worker_id) {
333                        failed_databases.push(*database_id);
334                    }
335                    continue;
336                }
337            };
338
339            if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
340                || database_checkpoint_control
341                    .database_info
342                    .contains_worker(worker_id as _)
343                || database_checkpoint_control
344                    .creating_streaming_job_controls
345                    .values_mut()
346                    .any(|job| !job.is_valid_after_worker_err(worker_id))
347            {
348                failed_databases.push(*database_id);
349            }
350        }
351        failed_databases
352    }
353
354    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
355        for (_, node) in self.databases.values_mut().flat_map(|status| {
356            status
357                .running_state_mut()
358                .map(|database| take(&mut database.command_ctx_queue))
359                .into_iter()
360                .flatten()
361        }) {
362            for notifier in node.notifiers {
363                notifier.notify_collection_failed(err.clone());
364            }
365            node.enqueue_time.observe_duration();
366        }
367    }
368
369    pub(crate) fn inflight_infos(
370        &self,
371    ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
372        self.databases.iter().flat_map(|(database_id, database)| {
373            database.database_state().map(|(_, creating_jobs)| {
374                (
375                    *database_id,
376                    creating_jobs
377                        .values()
378                        .filter_map(|job| job.is_consuming().then_some(job.job_id)),
379                )
380            })
381        })
382    }
383}
384
385pub(crate) enum CheckpointControlEvent<'a> {
386    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
387    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
388}
389
390impl CheckpointControl {
391    pub(crate) fn on_reset_database_resp(
392        &mut self,
393        worker_id: WorkerId,
394        resp: ResetDatabaseResponse,
395    ) {
396        let database_id = resp.database_id;
397        match self.databases.get_mut(&database_id).expect("should exist") {
398            DatabaseCheckpointControlStatus::Running(_) => {
399                unreachable!("should not receive reset database resp when running")
400            }
401            DatabaseCheckpointControlStatus::Recovering(state) => {
402                state.on_reset_database_resp(worker_id, resp)
403            }
404        }
405    }
406
407    pub(crate) fn next_event(
408        &mut self,
409    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
410        let mut this = Some(self);
411        poll_fn(move |cx| {
412            let Some(this_mut) = this.as_mut() else {
413                unreachable!("should not be polled after poll ready")
414            };
415            for (&database_id, database_status) in &mut this_mut.databases {
416                match database_status {
417                    DatabaseCheckpointControlStatus::Running(_) => {}
418                    DatabaseCheckpointControlStatus::Recovering(state) => {
419                        let poll_result = state.poll_next_event(cx);
420                        if let Poll::Ready(action) = poll_result {
421                            let this = this.take().expect("checked Some");
422                            return Poll::Ready(match action {
423                                RecoveringStateAction::EnterInitializing(reset_workers) => {
424                                    CheckpointControlEvent::EnteringInitializing(
425                                        this.new_database_status_action(
426                                            database_id,
427                                            EnterInitializing(reset_workers),
428                                        ),
429                                    )
430                                }
431                                RecoveringStateAction::EnterRunning => {
432                                    CheckpointControlEvent::EnteringRunning(
433                                        this.new_database_status_action(database_id, EnterRunning),
434                                    )
435                                }
436                            });
437                        }
438                    }
439                }
440            }
441            Poll::Pending
442        })
443    }
444}
445
446pub(crate) enum DatabaseCheckpointControlStatus {
447    Running(DatabaseCheckpointControl),
448    Recovering(DatabaseRecoveringState),
449}
450
451impl DatabaseCheckpointControlStatus {
452    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
453        match self {
454            DatabaseCheckpointControlStatus::Running(state) => Some(state),
455            DatabaseCheckpointControlStatus::Recovering(_) => None,
456        }
457    }
458
459    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
460        match self {
461            DatabaseCheckpointControlStatus::Running(state) => Some(state),
462            DatabaseCheckpointControlStatus::Recovering(_) => None,
463        }
464    }
465
466    fn database_state(
467        &self,
468    ) -> Option<(
469        &BarrierWorkerState,
470        &HashMap<JobId, CreatingStreamingJobControl>,
471    )> {
472        match self {
473            DatabaseCheckpointControlStatus::Running(control) => {
474                Some((&control.state, &control.creating_streaming_job_controls))
475            }
476            DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
477        }
478    }
479
480    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
481        match self {
482            DatabaseCheckpointControlStatus::Running(state) => state,
483            DatabaseCheckpointControlStatus::Recovering(_) => {
484                panic!("should be at running: {}", reason)
485            }
486        }
487    }
488}
489
490struct DatabaseCheckpointControlMetrics {
491    barrier_latency: LabelGuardedHistogram,
492    in_flight_barrier_nums: LabelGuardedIntGauge,
493    all_barrier_nums: LabelGuardedIntGauge,
494}
495
496impl DatabaseCheckpointControlMetrics {
497    fn new(database_id: DatabaseId) -> Self {
498        let database_id_str = database_id.to_string();
499        let barrier_latency = GLOBAL_META_METRICS
500            .barrier_latency
501            .with_guarded_label_values(&[&database_id_str]);
502        let in_flight_barrier_nums = GLOBAL_META_METRICS
503            .in_flight_barrier_nums
504            .with_guarded_label_values(&[&database_id_str]);
505        let all_barrier_nums = GLOBAL_META_METRICS
506            .all_barrier_nums
507            .with_guarded_label_values(&[&database_id_str]);
508        Self {
509            barrier_latency,
510            in_flight_barrier_nums,
511            all_barrier_nums,
512        }
513    }
514}
515
516/// Controls the concurrent execution of commands.
517pub(crate) struct DatabaseCheckpointControl {
518    pub(super) database_id: DatabaseId,
519    pub(super) state: BarrierWorkerState,
520
521    /// Save the state and message of barrier in order.
522    /// Key is the `prev_epoch`.
523    command_ctx_queue: BTreeMap<u64, EpochNode>,
524    /// The barrier that are completing.
525    /// Some(`prev_epoch`)
526    completing_barrier: Option<u64>,
527
528    committed_epoch: Option<u64>,
529
530    pub(super) database_info: InflightDatabaseInfo,
531    pub(super) creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
532
533    metrics: DatabaseCheckpointControlMetrics,
534}
535
536impl DatabaseCheckpointControl {
537    fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
538        Self {
539            database_id,
540            state: BarrierWorkerState::new(),
541            command_ctx_queue: Default::default(),
542            completing_barrier: None,
543            committed_epoch: None,
544            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
545            creating_streaming_job_controls: Default::default(),
546            metrics: DatabaseCheckpointControlMetrics::new(database_id),
547        }
548    }
549
550    pub(crate) fn recovery(
551        database_id: DatabaseId,
552        state: BarrierWorkerState,
553        committed_epoch: u64,
554        database_info: InflightDatabaseInfo,
555        creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
556    ) -> Self {
557        Self {
558            database_id,
559            state,
560            command_ctx_queue: Default::default(),
561            completing_barrier: None,
562            committed_epoch: Some(committed_epoch),
563            database_info,
564            creating_streaming_job_controls,
565            metrics: DatabaseCheckpointControlMetrics::new(database_id),
566        }
567    }
568
569    fn total_command_num(&self) -> usize {
570        self.command_ctx_queue.len()
571            + match &self.completing_barrier {
572                Some(_) => 1,
573                None => 0,
574            }
575    }
576
577    /// Update the metrics of barrier nums.
578    fn update_barrier_nums_metrics(&self) {
579        self.metrics.in_flight_barrier_nums.set(
580            self.command_ctx_queue
581                .values()
582                .filter(|x| x.state.is_inflight())
583                .count() as i64,
584        );
585        self.metrics
586            .all_barrier_nums
587            .set(self.total_command_num() as i64);
588    }
589
590    /// Enqueue a barrier command
591    fn enqueue_command(
592        &mut self,
593        command_ctx: CommandContext,
594        notifiers: Vec<Notifier>,
595        node_to_collect: NodeToCollect,
596        creating_jobs_to_wait: HashSet<JobId>,
597    ) {
598        let timer = self.metrics.barrier_latency.start_timer();
599
600        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
601            assert_eq!(
602                command_ctx.barrier_info.prev_epoch.value(),
603                node.command_ctx.barrier_info.curr_epoch.value()
604            );
605        }
606
607        tracing::trace!(
608            prev_epoch = command_ctx.barrier_info.prev_epoch(),
609            ?creating_jobs_to_wait,
610            ?node_to_collect,
611            "enqueue command"
612        );
613        self.command_ctx_queue.insert(
614            command_ctx.barrier_info.prev_epoch(),
615            EpochNode {
616                enqueue_time: timer,
617                state: BarrierEpochState {
618                    node_to_collect,
619                    resps: vec![],
620                    creating_jobs_to_wait,
621                    finished_jobs: HashMap::new(),
622                },
623                command_ctx,
624                notifiers,
625            },
626        );
627    }
628
629    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
630    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
631    fn barrier_collected(
632        &mut self,
633        resp: BarrierCompleteResponse,
634        periodic_barriers: &mut PeriodicBarriers,
635    ) -> MetaResult<()> {
636        let worker_id = resp.worker_id;
637        let prev_epoch = resp.epoch;
638        tracing::trace!(
639            %worker_id,
640            prev_epoch,
641            partial_graph_id = resp.partial_graph_id,
642            "barrier collected"
643        );
644        let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
645        match creating_job_id {
646            None => {
647                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
648                    assert!(node.state.node_to_collect.remove(&worker_id).is_some());
649                    node.state.resps.push(resp);
650                } else {
651                    panic!(
652                        "collect barrier on non-existing barrier: {}, {}",
653                        prev_epoch, worker_id
654                    );
655                }
656            }
657            Some(creating_job_id) => {
658                let should_merge_to_upstream = self
659                    .creating_streaming_job_controls
660                    .get_mut(&creating_job_id)
661                    .expect("should exist")
662                    .collect(resp);
663                if should_merge_to_upstream {
664                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
665                }
666            }
667        }
668        Ok(())
669    }
670
671    /// Pause inject barrier until True.
672    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
673        self.command_ctx_queue
674            .values()
675            .filter(|x| x.state.is_inflight())
676            .count()
677            < in_flight_barrier_nums
678    }
679
680    /// Return whether the database can still work after worker failure
681    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
682        for epoch_node in self.command_ctx_queue.values_mut() {
683            if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
684                return false;
685            }
686        }
687        // TODO: include barrier in creating jobs
688        true
689    }
690}
691
692impl DatabaseCheckpointControl {
693    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
694    fn collect_backfill_pinned_upstream_log_epoch(
695        &self,
696    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
697        self.creating_streaming_job_controls
698            .iter()
699            .filter_map(|(job_id, creating_job)| {
700                creating_job
701                    .pinned_upstream_log_epoch()
702                    .map(|progress_epoch| {
703                        (
704                            *job_id,
705                            (
706                                progress_epoch,
707                                creating_job.snapshot_backfill_upstream_tables.clone(),
708                            ),
709                        )
710                    })
711            })
712            .collect()
713    }
714
715    fn next_complete_barrier_task(
716        &mut self,
717        task: &mut Option<CompleteBarrierTask>,
718        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
719        hummock_version_stats: &HummockVersionStats,
720    ) {
721        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
722        let mut creating_jobs_task = vec![];
723        {
724            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
725            let mut finished_jobs = Vec::new();
726            let min_upstream_inflight_barrier = self
727                .command_ctx_queue
728                .first_key_value()
729                .map(|(epoch, _)| *epoch);
730            for (job_id, job) in &mut self.creating_streaming_job_controls {
731                if let Some((epoch, resps, status)) =
732                    job.start_completing(min_upstream_inflight_barrier)
733                {
734                    let is_first_time = match status {
735                        CompleteJobType::First => true,
736                        CompleteJobType::Normal => false,
737                        CompleteJobType::Finished => {
738                            finished_jobs.push((*job_id, epoch, resps));
739                            continue;
740                        }
741                    };
742                    creating_jobs_task.push((*job_id, epoch, resps, is_first_time));
743                }
744            }
745            if !finished_jobs.is_empty()
746                && let Some((_, control_stream_manager)) = &mut context
747            {
748                control_stream_manager.remove_partial_graph(
749                    self.database_id,
750                    finished_jobs.iter().map(|(job_id, _, _)| *job_id).collect(),
751                );
752            }
753            for (job_id, epoch, resps) in finished_jobs {
754                let epoch_state = &mut self
755                    .command_ctx_queue
756                    .get_mut(&epoch)
757                    .expect("should exist")
758                    .state;
759                assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
760                debug!(epoch, %job_id, "finish creating job");
761                // It's safe to remove the creating job, because on CompleteJobType::Finished,
762                // all previous barriers have been collected and completed.
763                let creating_streaming_job = self
764                    .creating_streaming_job_controls
765                    .remove(&job_id)
766                    .expect("should exist");
767                let tracking_job = creating_streaming_job.into_tracking_job();
768
769                assert!(
770                    epoch_state
771                        .finished_jobs
772                        .insert(job_id, (resps, tracking_job))
773                        .is_none()
774                );
775            }
776        }
777        assert!(self.completing_barrier.is_none());
778        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
779            && !state.is_inflight()
780        {
781            {
782                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
783                assert!(node.state.creating_jobs_to_wait.is_empty());
784                assert!(node.state.node_to_collect.is_empty());
785
786                self.handle_refresh_table_info(task, &node);
787
788                self.database_info.apply_collected_command(
789                    node.command_ctx.command.as_ref(),
790                    &node.state.resps,
791                    hummock_version_stats,
792                );
793                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
794                    node.notifiers.into_iter().for_each(|notifier| {
795                        notifier.notify_collected();
796                    });
797                    if let Some((periodic_barriers, _)) = &mut context
798                        && self.database_info.has_pending_finished_jobs()
799                        && self
800                            .command_ctx_queue
801                            .values()
802                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
803                    {
804                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
805                    }
806                    continue;
807                }
808                let mut staging_commit_info = self.database_info.take_staging_commit_info();
809                node.state
810                    .finished_jobs
811                    .drain()
812                    .for_each(|(_job_id, (resps, tracking_job))| {
813                        node.state.resps.extend(resps);
814                        staging_commit_info.finished_jobs.push(tracking_job);
815                    });
816                let task = task.get_or_insert_default();
817                node.command_ctx.collect_commit_epoch_info(
818                    &mut task.commit_info,
819                    take(&mut node.state.resps),
820                    self.collect_backfill_pinned_upstream_log_epoch(),
821                );
822                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
823                task.finished_jobs.extend(staging_commit_info.finished_jobs);
824                task.finished_cdc_table_backfill
825                    .extend(staging_commit_info.finished_cdc_table_backfill);
826                task.notifiers.extend(node.notifiers);
827                task.epoch_infos
828                    .try_insert(
829                        self.database_id,
830                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
831                    )
832                    .expect("non duplicate");
833                task.commit_info
834                    .truncate_tables
835                    .extend(staging_commit_info.table_ids_to_truncate);
836                break;
837            }
838        }
839        if !creating_jobs_task.is_empty() {
840            let task = task.get_or_insert_default();
841            for (job_id, epoch, resps, is_first_time) in creating_jobs_task {
842                collect_creating_job_commit_epoch_info(
843                    &mut task.commit_info,
844                    epoch,
845                    resps,
846                    self.creating_streaming_job_controls[&job_id]
847                        .state_table_ids()
848                        .iter()
849                        .copied(),
850                    is_first_time,
851                );
852                let (_, creating_job_epochs) =
853                    task.epoch_infos.entry(self.database_id).or_default();
854                creating_job_epochs.push((job_id, epoch));
855            }
856        }
857    }
858
859    fn ack_completed(
860        &mut self,
861        command_prev_epoch: Option<u64>,
862        creating_job_epochs: Vec<(JobId, u64)>,
863    ) {
864        {
865            if let Some(prev_epoch) = self.completing_barrier.take() {
866                assert_eq!(command_prev_epoch, Some(prev_epoch));
867                self.committed_epoch = Some(prev_epoch);
868            } else {
869                assert_eq!(command_prev_epoch, None);
870            };
871            for (job_id, epoch) in creating_job_epochs {
872                self.creating_streaming_job_controls
873                    .get_mut(&job_id)
874                    .expect("should exist")
875                    .ack_completed(epoch)
876            }
877        }
878    }
879
880    fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
881        let list_finished_info = node
882            .state
883            .resps
884            .iter()
885            .flat_map(|resp| resp.list_finished_sources.clone())
886            .collect::<Vec<_>>();
887        if !list_finished_info.is_empty() {
888            let task = task.get_or_insert_default();
889            task.list_finished_source_ids.extend(list_finished_info);
890        }
891
892        let load_finished_info = node
893            .state
894            .resps
895            .iter()
896            .flat_map(|resp| resp.load_finished_sources.clone())
897            .collect::<Vec<_>>();
898        if !load_finished_info.is_empty() {
899            let task = task.get_or_insert_default();
900            task.load_finished_source_ids.extend(load_finished_info);
901        }
902
903        let refresh_finished_table_ids: Vec<JobId> = node
904            .state
905            .resps
906            .iter()
907            .flat_map(|resp| {
908                resp.refresh_finished_tables
909                    .iter()
910                    .map(|table_id| table_id.as_job_id())
911            })
912            .collect::<Vec<_>>();
913        if !refresh_finished_table_ids.is_empty() {
914            let task = task.get_or_insert_default();
915            task.refresh_finished_table_job_ids
916                .extend(refresh_finished_table_ids);
917        }
918    }
919}
920
921/// The state and message of this barrier, a node for concurrent checkpoint.
922struct EpochNode {
923    /// Timer for recording barrier latency, taken after `complete_barriers`.
924    enqueue_time: HistogramTimer,
925
926    /// Whether this barrier is in-flight or completed.
927    state: BarrierEpochState,
928
929    /// Context of this command to generate barrier and do some post jobs.
930    command_ctx: CommandContext,
931    /// Notifiers of this barrier.
932    notifiers: Vec<Notifier>,
933}
934
935#[derive(Debug)]
936/// The state of barrier.
937struct BarrierEpochState {
938    node_to_collect: NodeToCollect,
939
940    resps: Vec<BarrierCompleteResponse>,
941
942    creating_jobs_to_wait: HashSet<JobId>,
943
944    finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
945}
946
947impl BarrierEpochState {
948    fn is_inflight(&self) -> bool {
949        !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
950    }
951}
952
953impl DatabaseCheckpointControl {
954    /// Handle the new barrier from the scheduled queue and inject it.
955    fn handle_new_barrier(
956        &mut self,
957        command: Option<(Command, Vec<Notifier>)>,
958        checkpoint: bool,
959        span: tracing::Span,
960        control_stream_manager: &mut ControlStreamManager,
961        hummock_version_stats: &HummockVersionStats,
962    ) -> MetaResult<()> {
963        let curr_epoch = self.state.in_flight_prev_epoch().next();
964
965        let (command, mut notifiers) = if let Some((command, notifiers)) = command {
966            (Some(command), notifiers)
967        } else {
968            (None, vec![])
969        };
970
971        for job_to_cancel in command
972            .as_ref()
973            .map(Command::jobs_to_drop)
974            .into_iter()
975            .flatten()
976        {
977            if self
978                .creating_streaming_job_controls
979                .contains_key(&job_to_cancel)
980            {
981                warn!(
982                    job_id = %job_to_cancel,
983                    "ignore cancel command on creating streaming job"
984                );
985                for notifier in notifiers {
986                    notifier
987                        .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
988                }
989                return Ok(());
990            }
991        }
992
993        if let Some(Command::RescheduleFragment { .. }) = &command
994            && !self.creating_streaming_job_controls.is_empty()
995        {
996            warn!("ignore reschedule when creating streaming job with snapshot backfill");
997            for notifier in notifiers {
998                notifier.notify_start_failed(
999                    anyhow!(
1000                            "cannot reschedule when creating streaming job with snapshot backfill",
1001                        )
1002                        .into(),
1003                );
1004            }
1005            return Ok(());
1006        }
1007
1008        if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1009            && self.database_info.is_empty()
1010        {
1011            assert!(
1012                self.creating_streaming_job_controls.is_empty(),
1013                "should not have snapshot backfill job when there is no normal job in database"
1014            );
1015            // skip the command when there is nothing to do with the barrier
1016            for mut notifier in notifiers {
1017                notifier.notify_started();
1018                notifier.notify_collected();
1019            }
1020            return Ok(());
1021        };
1022
1023        if let Some(Command::CreateStreamingJob {
1024            job_type: CreateStreamingJobType::SnapshotBackfill(_),
1025            ..
1026        }) = &command
1027            && self.state.is_paused()
1028        {
1029            warn!("cannot create streaming job with snapshot backfill when paused");
1030            for notifier in notifiers {
1031                notifier.notify_start_failed(
1032                    anyhow!("cannot create streaming job with snapshot backfill when paused",)
1033                        .into(),
1034                );
1035            }
1036            return Ok(());
1037        }
1038
1039        let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1040        // Tracing related stuff
1041        barrier_info.prev_epoch.span().in_scope(|| {
1042                tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1043            });
1044        span.record("epoch", barrier_info.curr_epoch.value().0);
1045
1046        let ApplyCommandInfo {
1047            mv_subscription_max_retention,
1048            table_ids_to_commit,
1049            jobs_to_wait,
1050            node_to_collect,
1051            command,
1052        } = match self.apply_command(
1053            command,
1054            &barrier_info,
1055            control_stream_manager,
1056            hummock_version_stats,
1057        ) {
1058            Ok(info) => info,
1059            Err(err) => {
1060                for notifier in notifiers {
1061                    notifier.notify_start_failed(err.clone());
1062                }
1063                fail_point!("inject_barrier_err_success");
1064                return Err(err);
1065            }
1066        };
1067
1068        // Notify about the injection.
1069        notifiers.iter_mut().for_each(|n| n.notify_started());
1070
1071        let command_ctx = CommandContext::new(
1072            barrier_info,
1073            mv_subscription_max_retention,
1074            table_ids_to_commit,
1075            command,
1076            span,
1077        );
1078
1079        // Record the in-flight barrier.
1080        self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1081
1082        Ok(())
1083    }
1084}