risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::id::FragmentId;
31use risingwave_pb::stream_service::BarrierCompleteResponse;
32use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
33use thiserror_ext::AsReport;
34use tracing::{debug, warn};
35
36use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
37use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
38use crate::barrier::checkpoint::recovery::{
39    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
40    RecoveringStateAction,
41};
42use crate::barrier::checkpoint::state::BarrierWorkerState;
43use crate::barrier::command::CommandContext;
44use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
45use crate::barrier::info::SharedActorInfos;
46use crate::barrier::notifier::Notifier;
47use crate::barrier::progress::TrackingJob;
48use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
49use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
50use crate::barrier::utils::{
51    NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
52};
53use crate::barrier::{BarrierKind, Command, CreateStreamingJobType};
54use crate::controller::fragment::InflightFragmentInfo;
55use crate::manager::MetaSrvEnv;
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::fill_snapshot_backfill_epoch;
58use crate::{MetaError, MetaResult};
59
60pub(crate) struct CheckpointControl {
61    pub(crate) env: MetaSrvEnv,
62    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
63    pub(super) hummock_version_stats: HummockVersionStats,
64    /// The max barrier nums in flight
65    pub(crate) in_flight_barrier_nums: usize,
66}
67
68impl CheckpointControl {
69    pub fn new(env: MetaSrvEnv) -> Self {
70        Self {
71            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
72            env,
73            databases: Default::default(),
74            hummock_version_stats: Default::default(),
75        }
76    }
77
78    pub(crate) fn recover(
79        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
80        failed_databases: HashSet<DatabaseId>,
81        control_stream_manager: &mut ControlStreamManager,
82        hummock_version_stats: HummockVersionStats,
83        env: MetaSrvEnv,
84    ) -> Self {
85        env.shared_actor_infos()
86            .retain_databases(databases.keys().chain(&failed_databases).cloned());
87        Self {
88            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
89            env,
90            databases: databases
91                .into_iter()
92                .map(|(database_id, control)| {
93                    (
94                        database_id,
95                        DatabaseCheckpointControlStatus::Running(control),
96                    )
97                })
98                .chain(failed_databases.into_iter().map(|database_id| {
99                    (
100                        database_id,
101                        DatabaseCheckpointControlStatus::Recovering(
102                            DatabaseRecoveringState::resetting(database_id, control_stream_manager),
103                        ),
104                    )
105                }))
106                .collect(),
107            hummock_version_stats,
108        }
109    }
110
111    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
112        self.hummock_version_stats = output.hummock_version_stats;
113        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
114            self.databases
115                .get_mut(&database_id)
116                .expect("should exist")
117                .expect_running("should have wait for completing command before enter recovery")
118                .ack_completed(command_prev_epoch, creating_job_epochs);
119        }
120    }
121
122    pub(crate) fn next_complete_barrier_task(
123        &mut self,
124        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
125    ) -> Option<CompleteBarrierTask> {
126        let mut task = None;
127        for database in self.databases.values_mut() {
128            let Some(database) = database.running_state_mut() else {
129                continue;
130            };
131            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
132            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
133        }
134        task
135    }
136
137    pub(crate) fn barrier_collected(
138        &mut self,
139        resp: BarrierCompleteResponse,
140        periodic_barriers: &mut PeriodicBarriers,
141    ) -> MetaResult<()> {
142        let database_id = resp.database_id;
143        let database_status = self.databases.get_mut(&database_id).expect("should exist");
144        match database_status {
145            DatabaseCheckpointControlStatus::Running(database) => {
146                database.barrier_collected(resp, periodic_barriers)
147            }
148            DatabaseCheckpointControlStatus::Recovering(state) => {
149                state.barrier_collected(database_id, resp);
150                Ok(())
151            }
152        }
153    }
154
155    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
156        self.databases.iter().filter_map(|(database_id, database)| {
157            database.running_state().is_none().then_some(*database_id)
158        })
159    }
160
161    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
162        self.databases.iter().filter_map(|(database_id, database)| {
163            database.running_state().is_some().then_some(*database_id)
164        })
165    }
166
167    /// return Some(failed `database_id` -> `err`)
168    pub(crate) fn handle_new_barrier(
169        &mut self,
170        new_barrier: NewBarrier,
171        control_stream_manager: &mut ControlStreamManager,
172    ) -> MetaResult<()> {
173        let NewBarrier {
174            database_id,
175            command,
176            span,
177            checkpoint,
178        } = new_barrier;
179
180        if let Some((mut command, notifiers)) = command {
181            if let &mut Command::CreateStreamingJob {
182                ref mut cross_db_snapshot_backfill_info,
183                ref info,
184                ..
185            } = &mut command
186            {
187                for (table_id, snapshot_epoch) in
188                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
189                {
190                    for database in self.databases.values() {
191                        if let Some(database) = database.running_state()
192                            && database
193                                .state
194                                .inflight_graph_info
195                                .contains_job(table_id.as_job_id())
196                        {
197                            if let Some(committed_epoch) = database.committed_epoch {
198                                *snapshot_epoch = Some(committed_epoch);
199                            }
200                            break;
201                        }
202                    }
203                    if snapshot_epoch.is_none() {
204                        let table_id = *table_id;
205                        warn!(
206                            ?cross_db_snapshot_backfill_info,
207                            ?table_id,
208                            ?info,
209                            "database of cross db upstream table not found"
210                        );
211                        let err: MetaError =
212                            anyhow!("database of cross db upstream table {} not found", table_id)
213                                .into();
214                        for notifier in notifiers {
215                            notifier.notify_start_failed(err.clone());
216                        }
217
218                        return Ok(());
219                    }
220                }
221            }
222
223            let database = match self.databases.entry(database_id) {
224                Entry::Occupied(entry) => entry
225                    .into_mut()
226                    .expect_running("should not have command when not running"),
227                Entry::Vacant(entry) => match &command {
228                    Command::CreateStreamingJob {
229                        job_type: CreateStreamingJobType::Normal,
230                        ..
231                    } => {
232                        let new_database = DatabaseCheckpointControl::new(
233                            database_id,
234                            self.env.shared_actor_infos().clone(),
235                            self.env.cdc_table_backfill_tracker(),
236                        );
237                        control_stream_manager.add_partial_graph(database_id, None);
238                        entry
239                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
240                            .expect_running("just initialized as running")
241                    }
242                    Command::Flush | Command::Pause | Command::Resume => {
243                        for mut notifier in notifiers {
244                            notifier.notify_started();
245                            notifier.notify_collected();
246                        }
247                        warn!(?command, "skip command for empty database");
248                        return Ok(());
249                    }
250                    _ => {
251                        panic!(
252                            "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
253                            database_id, command
254                        )
255                    }
256                },
257            };
258
259            database.handle_new_barrier(
260                Some((command, notifiers)),
261                checkpoint,
262                span,
263                control_stream_manager,
264                &self.hummock_version_stats,
265            )
266        } else {
267            let database = match self.databases.entry(database_id) {
268                Entry::Occupied(entry) => entry.into_mut(),
269                Entry::Vacant(_) => {
270                    // If it does not exist in the HashMap yet, it means that the first streaming
271                    // job has not been created, and we do not need to send a barrier.
272                    return Ok(());
273                }
274            };
275            let Some(database) = database.running_state_mut() else {
276                // Skip new barrier for database which is not running.
277                return Ok(());
278            };
279            if !database.can_inject_barrier(self.in_flight_barrier_nums) {
280                // Skip new barrier with no explicit command when the database should pause inject additional barrier
281                return Ok(());
282            }
283            database.handle_new_barrier(
284                None,
285                checkpoint,
286                span,
287                control_stream_manager,
288                &self.hummock_version_stats,
289            )
290        }
291    }
292
293    pub(crate) fn update_barrier_nums_metrics(&self) {
294        self.databases
295            .values()
296            .flat_map(|database| database.running_state())
297            .for_each(|database| database.update_barrier_nums_metrics());
298    }
299
300    pub(crate) fn gen_ddl_progress(&self) -> HashMap<JobId, DdlProgress> {
301        let mut progress = HashMap::new();
302        for status in self.databases.values() {
303            let Some(database_checkpoint_control) = status.running_state() else {
304                continue;
305            };
306            // Progress of normal backfill
307            progress.extend(
308                database_checkpoint_control
309                    .state
310                    .inflight_graph_info
311                    .gen_ddl_progress(),
312            );
313            // Progress of snapshot backfill
314            for creating_job in database_checkpoint_control
315                .creating_streaming_job_controls
316                .values()
317            {
318                progress.extend([(creating_job.job_id, creating_job.gen_ddl_progress())]);
319            }
320        }
321        progress
322    }
323
324    pub(crate) fn databases_failed_at_worker_err(
325        &mut self,
326        worker_id: WorkerId,
327    ) -> Vec<DatabaseId> {
328        let mut failed_databases = Vec::new();
329        for (database_id, database_status) in &mut self.databases {
330            let database_checkpoint_control = match database_status {
331                DatabaseCheckpointControlStatus::Running(control) => control,
332                DatabaseCheckpointControlStatus::Recovering(state) => {
333                    if !state.is_valid_after_worker_err(worker_id) {
334                        failed_databases.push(*database_id);
335                    }
336                    continue;
337                }
338            };
339
340            if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
341                || database_checkpoint_control
342                    .state
343                    .inflight_graph_info
344                    .contains_worker(worker_id as _)
345                || database_checkpoint_control
346                    .creating_streaming_job_controls
347                    .values_mut()
348                    .any(|job| !job.is_valid_after_worker_err(worker_id))
349            {
350                failed_databases.push(*database_id);
351            }
352        }
353        failed_databases
354    }
355
356    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
357        for (_, node) in self.databases.values_mut().flat_map(|status| {
358            status
359                .running_state_mut()
360                .map(|database| take(&mut database.command_ctx_queue))
361                .into_iter()
362                .flatten()
363        }) {
364            for notifier in node.notifiers {
365                notifier.notify_collection_failed(err.clone());
366            }
367            node.enqueue_time.observe_duration();
368        }
369    }
370
371    pub(crate) fn inflight_infos(
372        &self,
373    ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
374        self.databases.iter().flat_map(|(database_id, database)| {
375            database.database_state().map(|(_, creating_jobs)| {
376                (
377                    *database_id,
378                    creating_jobs
379                        .values()
380                        .filter_map(|job| job.is_consuming().then_some(job.job_id)),
381                )
382            })
383        })
384    }
385}
386
387pub(crate) enum CheckpointControlEvent<'a> {
388    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
389    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
390}
391
392impl CheckpointControl {
393    pub(crate) fn on_reset_database_resp(
394        &mut self,
395        worker_id: WorkerId,
396        resp: ResetDatabaseResponse,
397    ) {
398        let database_id = resp.database_id;
399        match self.databases.get_mut(&database_id).expect("should exist") {
400            DatabaseCheckpointControlStatus::Running(_) => {
401                unreachable!("should not receive reset database resp when running")
402            }
403            DatabaseCheckpointControlStatus::Recovering(state) => {
404                state.on_reset_database_resp(worker_id, resp)
405            }
406        }
407    }
408
409    pub(crate) fn next_event(
410        &mut self,
411    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
412        let mut this = Some(self);
413        poll_fn(move |cx| {
414            let Some(this_mut) = this.as_mut() else {
415                unreachable!("should not be polled after poll ready")
416            };
417            for (&database_id, database_status) in &mut this_mut.databases {
418                match database_status {
419                    DatabaseCheckpointControlStatus::Running(_) => {}
420                    DatabaseCheckpointControlStatus::Recovering(state) => {
421                        let poll_result = state.poll_next_event(cx);
422                        if let Poll::Ready(action) = poll_result {
423                            let this = this.take().expect("checked Some");
424                            return Poll::Ready(match action {
425                                RecoveringStateAction::EnterInitializing(reset_workers) => {
426                                    CheckpointControlEvent::EnteringInitializing(
427                                        this.new_database_status_action(
428                                            database_id,
429                                            EnterInitializing(reset_workers),
430                                        ),
431                                    )
432                                }
433                                RecoveringStateAction::EnterRunning => {
434                                    CheckpointControlEvent::EnteringRunning(
435                                        this.new_database_status_action(database_id, EnterRunning),
436                                    )
437                                }
438                            });
439                        }
440                    }
441                }
442            }
443            Poll::Pending
444        })
445    }
446}
447
448pub(crate) enum DatabaseCheckpointControlStatus {
449    Running(DatabaseCheckpointControl),
450    Recovering(DatabaseRecoveringState),
451}
452
453impl DatabaseCheckpointControlStatus {
454    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
455        match self {
456            DatabaseCheckpointControlStatus::Running(state) => Some(state),
457            DatabaseCheckpointControlStatus::Recovering(_) => None,
458        }
459    }
460
461    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
462        match self {
463            DatabaseCheckpointControlStatus::Running(state) => Some(state),
464            DatabaseCheckpointControlStatus::Recovering(_) => None,
465        }
466    }
467
468    fn database_state(
469        &self,
470    ) -> Option<(
471        &BarrierWorkerState,
472        &HashMap<JobId, CreatingStreamingJobControl>,
473    )> {
474        match self {
475            DatabaseCheckpointControlStatus::Running(control) => {
476                Some((&control.state, &control.creating_streaming_job_controls))
477            }
478            DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
479        }
480    }
481
482    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
483        match self {
484            DatabaseCheckpointControlStatus::Running(state) => state,
485            DatabaseCheckpointControlStatus::Recovering(_) => {
486                panic!("should be at running: {}", reason)
487            }
488        }
489    }
490}
491
492struct DatabaseCheckpointControlMetrics {
493    barrier_latency: LabelGuardedHistogram,
494    in_flight_barrier_nums: LabelGuardedIntGauge,
495    all_barrier_nums: LabelGuardedIntGauge,
496}
497
498impl DatabaseCheckpointControlMetrics {
499    fn new(database_id: DatabaseId) -> Self {
500        let database_id_str = database_id.to_string();
501        let barrier_latency = GLOBAL_META_METRICS
502            .barrier_latency
503            .with_guarded_label_values(&[&database_id_str]);
504        let in_flight_barrier_nums = GLOBAL_META_METRICS
505            .in_flight_barrier_nums
506            .with_guarded_label_values(&[&database_id_str]);
507        let all_barrier_nums = GLOBAL_META_METRICS
508            .all_barrier_nums
509            .with_guarded_label_values(&[&database_id_str]);
510        Self {
511            barrier_latency,
512            in_flight_barrier_nums,
513            all_barrier_nums,
514        }
515    }
516}
517
518/// Controls the concurrent execution of commands.
519pub(crate) struct DatabaseCheckpointControl {
520    database_id: DatabaseId,
521    state: BarrierWorkerState,
522
523    /// Save the state and message of barrier in order.
524    /// Key is the `prev_epoch`.
525    command_ctx_queue: BTreeMap<u64, EpochNode>,
526    /// The barrier that are completing.
527    /// Some(`prev_epoch`)
528    completing_barrier: Option<u64>,
529
530    committed_epoch: Option<u64>,
531    creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
532
533    cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
534
535    metrics: DatabaseCheckpointControlMetrics,
536}
537
538impl DatabaseCheckpointControl {
539    fn new(
540        database_id: DatabaseId,
541        shared_actor_infos: SharedActorInfos,
542        cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
543    ) -> Self {
544        Self {
545            database_id,
546            state: BarrierWorkerState::new(database_id, shared_actor_infos),
547            command_ctx_queue: Default::default(),
548            completing_barrier: None,
549            committed_epoch: None,
550            creating_streaming_job_controls: Default::default(),
551            cdc_table_backfill_tracker,
552            metrics: DatabaseCheckpointControlMetrics::new(database_id),
553        }
554    }
555
556    pub(crate) fn recovery(
557        database_id: DatabaseId,
558        state: BarrierWorkerState,
559        committed_epoch: u64,
560        creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
561        cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
562    ) -> Self {
563        Self {
564            database_id,
565            state,
566            command_ctx_queue: Default::default(),
567            completing_barrier: None,
568            committed_epoch: Some(committed_epoch),
569            creating_streaming_job_controls,
570            cdc_table_backfill_tracker,
571            metrics: DatabaseCheckpointControlMetrics::new(database_id),
572        }
573    }
574
575    fn total_command_num(&self) -> usize {
576        self.command_ctx_queue.len()
577            + match &self.completing_barrier {
578                Some(_) => 1,
579                None => 0,
580            }
581    }
582
583    /// Update the metrics of barrier nums.
584    fn update_barrier_nums_metrics(&self) {
585        self.metrics.in_flight_barrier_nums.set(
586            self.command_ctx_queue
587                .values()
588                .filter(|x| x.state.is_inflight())
589                .count() as i64,
590        );
591        self.metrics
592            .all_barrier_nums
593            .set(self.total_command_num() as i64);
594    }
595
596    #[expect(clippy::type_complexity)]
597    fn jobs_to_merge(
598        &self,
599    ) -> Option<HashMap<JobId, (HashSet<TableId>, HashMap<FragmentId, InflightFragmentInfo>)>> {
600        let mut job_ids_to_merge = HashMap::new();
601
602        for (job_id, creating_streaming_job) in &self.creating_streaming_job_controls {
603            if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
604                job_ids_to_merge.insert(
605                    *job_id,
606                    (
607                        creating_streaming_job
608                            .snapshot_backfill_upstream_tables
609                            .clone(),
610                        graph_info.clone(),
611                    ),
612                );
613            }
614        }
615        if job_ids_to_merge.is_empty() {
616            None
617        } else {
618            Some(job_ids_to_merge)
619        }
620    }
621
622    /// Enqueue a barrier command
623    fn enqueue_command(
624        &mut self,
625        command_ctx: CommandContext,
626        notifiers: Vec<Notifier>,
627        node_to_collect: NodeToCollect,
628        creating_jobs_to_wait: HashSet<JobId>,
629    ) {
630        let timer = self.metrics.barrier_latency.start_timer();
631
632        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
633            assert_eq!(
634                command_ctx.barrier_info.prev_epoch.value(),
635                node.command_ctx.barrier_info.curr_epoch.value()
636            );
637        }
638
639        tracing::trace!(
640            prev_epoch = command_ctx.barrier_info.prev_epoch(),
641            ?creating_jobs_to_wait,
642            ?node_to_collect,
643            "enqueue command"
644        );
645        self.command_ctx_queue.insert(
646            command_ctx.barrier_info.prev_epoch(),
647            EpochNode {
648                enqueue_time: timer,
649                state: BarrierEpochState {
650                    node_to_collect,
651                    resps: vec![],
652                    creating_jobs_to_wait,
653                    finished_jobs: HashMap::new(),
654                },
655                command_ctx,
656                notifiers,
657            },
658        );
659    }
660
661    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
662    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
663    fn barrier_collected(
664        &mut self,
665        resp: BarrierCompleteResponse,
666        periodic_barriers: &mut PeriodicBarriers,
667    ) -> MetaResult<()> {
668        let worker_id = resp.worker_id;
669        let prev_epoch = resp.epoch;
670        tracing::trace!(
671            %worker_id,
672            prev_epoch,
673            partial_graph_id = resp.partial_graph_id,
674            "barrier collected"
675        );
676        let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
677        match creating_job_id {
678            None => {
679                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
680                    assert!(node.state.node_to_collect.remove(&worker_id).is_some());
681                    node.state.resps.push(resp);
682                } else {
683                    panic!(
684                        "collect barrier on non-existing barrier: {}, {}",
685                        prev_epoch, worker_id
686                    );
687                }
688            }
689            Some(creating_job_id) => {
690                let should_merge_to_upstream = self
691                    .creating_streaming_job_controls
692                    .get_mut(&creating_job_id)
693                    .expect("should exist")
694                    .collect(resp);
695                if should_merge_to_upstream {
696                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
697                }
698            }
699        }
700        Ok(())
701    }
702
703    /// Pause inject barrier until True.
704    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
705        self.command_ctx_queue
706            .values()
707            .filter(|x| x.state.is_inflight())
708            .count()
709            < in_flight_barrier_nums
710    }
711
712    /// Return whether the database can still work after worker failure
713    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
714        for epoch_node in self.command_ctx_queue.values_mut() {
715            if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
716                return false;
717            }
718        }
719        // TODO: include barrier in creating jobs
720        true
721    }
722}
723
724impl DatabaseCheckpointControl {
725    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
726    fn collect_backfill_pinned_upstream_log_epoch(
727        &self,
728    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
729        self.creating_streaming_job_controls
730            .iter()
731            .filter_map(|(job_id, creating_job)| {
732                creating_job
733                    .pinned_upstream_log_epoch()
734                    .map(|progress_epoch| {
735                        (
736                            *job_id,
737                            (
738                                progress_epoch,
739                                creating_job.snapshot_backfill_upstream_tables.clone(),
740                            ),
741                        )
742                    })
743            })
744            .collect()
745    }
746
747    fn next_complete_barrier_task(
748        &mut self,
749        task: &mut Option<CompleteBarrierTask>,
750        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
751        hummock_version_stats: &HummockVersionStats,
752    ) {
753        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
754        let mut creating_jobs_task = vec![];
755        {
756            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
757            let mut finished_jobs = Vec::new();
758            let min_upstream_inflight_barrier = self
759                .command_ctx_queue
760                .first_key_value()
761                .map(|(epoch, _)| *epoch);
762            for (job_id, job) in &mut self.creating_streaming_job_controls {
763                if let Some((epoch, resps, status)) =
764                    job.start_completing(min_upstream_inflight_barrier)
765                {
766                    let is_first_time = match status {
767                        CompleteJobType::First => true,
768                        CompleteJobType::Normal => false,
769                        CompleteJobType::Finished => {
770                            finished_jobs.push((*job_id, epoch, resps));
771                            continue;
772                        }
773                    };
774                    creating_jobs_task.push((*job_id, epoch, resps, is_first_time));
775                }
776            }
777            if !finished_jobs.is_empty()
778                && let Some((_, control_stream_manager)) = &mut context
779            {
780                control_stream_manager.remove_partial_graph(
781                    self.database_id,
782                    finished_jobs.iter().map(|(job_id, _, _)| *job_id).collect(),
783                );
784            }
785            for (job_id, epoch, resps) in finished_jobs {
786                let epoch_state = &mut self
787                    .command_ctx_queue
788                    .get_mut(&epoch)
789                    .expect("should exist")
790                    .state;
791                assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
792                debug!(epoch, %job_id, "finish creating job");
793                // It's safe to remove the creating job, because on CompleteJobType::Finished,
794                // all previous barriers have been collected and completed.
795                let creating_streaming_job = self
796                    .creating_streaming_job_controls
797                    .remove(&job_id)
798                    .expect("should exist");
799                let tracking_job = creating_streaming_job.into_tracking_job();
800
801                assert!(
802                    epoch_state
803                        .finished_jobs
804                        .insert(job_id, (resps, tracking_job))
805                        .is_none()
806                );
807            }
808        }
809        assert!(self.completing_barrier.is_none());
810        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
811            && !state.is_inflight()
812        {
813            {
814                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
815                assert!(node.state.creating_jobs_to_wait.is_empty());
816                assert!(node.state.node_to_collect.is_empty());
817
818                self.handle_refresh_table_info(task, &node);
819
820                self.state.inflight_graph_info.apply_collected_command(
821                    node.command_ctx.command.as_ref(),
822                    node.state.resps.iter(),
823                    hummock_version_stats,
824                );
825                let finished_cdc_backfill = self
826                    .cdc_table_backfill_tracker
827                    .apply_collected_command(&node.state.resps);
828                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
829                    node.notifiers.into_iter().for_each(|notifier| {
830                        notifier.notify_collected();
831                    });
832                    if let Some((periodic_barriers, _)) = &mut context
833                        && self.state.inflight_graph_info.has_pending_finished_jobs()
834                        && self
835                            .command_ctx_queue
836                            .values()
837                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
838                    {
839                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
840                    }
841                    continue;
842                }
843                let mut staging_commit_info =
844                    self.state.inflight_graph_info.take_staging_commit_info();
845                node.state
846                    .finished_jobs
847                    .drain()
848                    .for_each(|(_job_id, (resps, tracking_job))| {
849                        node.state.resps.extend(resps);
850                        staging_commit_info.finished_jobs.push(tracking_job);
851                    });
852                let task = task.get_or_insert_default();
853                node.command_ctx.collect_commit_epoch_info(
854                    &mut task.commit_info,
855                    take(&mut node.state.resps),
856                    self.collect_backfill_pinned_upstream_log_epoch(),
857                );
858                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
859                task.finished_jobs.extend(staging_commit_info.finished_jobs);
860                task.finished_cdc_table_backfill
861                    .extend(finished_cdc_backfill);
862                task.notifiers.extend(node.notifiers);
863                task.epoch_infos
864                    .try_insert(
865                        self.database_id,
866                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
867                    )
868                    .expect("non duplicate");
869                task.commit_info
870                    .truncate_tables
871                    .extend(staging_commit_info.table_ids_to_truncate);
872                break;
873            }
874        }
875        if !creating_jobs_task.is_empty() {
876            let task = task.get_or_insert_default();
877            for (job_id, epoch, resps, is_first_time) in creating_jobs_task {
878                collect_creating_job_commit_epoch_info(
879                    &mut task.commit_info,
880                    epoch,
881                    resps,
882                    self.creating_streaming_job_controls[&job_id].state_table_ids(),
883                    is_first_time,
884                );
885                let (_, creating_job_epochs) =
886                    task.epoch_infos.entry(self.database_id).or_default();
887                creating_job_epochs.push((job_id, epoch));
888            }
889        }
890    }
891
892    fn ack_completed(
893        &mut self,
894        command_prev_epoch: Option<u64>,
895        creating_job_epochs: Vec<(JobId, u64)>,
896    ) {
897        {
898            if let Some(prev_epoch) = self.completing_barrier.take() {
899                assert_eq!(command_prev_epoch, Some(prev_epoch));
900                self.committed_epoch = Some(prev_epoch);
901            } else {
902                assert_eq!(command_prev_epoch, None);
903            };
904            for (job_id, epoch) in creating_job_epochs {
905                self.creating_streaming_job_controls
906                    .get_mut(&job_id)
907                    .expect("should exist")
908                    .ack_completed(epoch)
909            }
910        }
911    }
912
913    fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
914        let list_finished_info = node
915            .state
916            .resps
917            .iter()
918            .flat_map(|resp| resp.list_finished_sources.clone())
919            .collect::<Vec<_>>();
920        if !list_finished_info.is_empty() {
921            let task = task.get_or_insert_default();
922            task.list_finished_source_ids.extend(list_finished_info);
923        }
924
925        let load_finished_info = node
926            .state
927            .resps
928            .iter()
929            .flat_map(|resp| resp.load_finished_sources.clone())
930            .collect::<Vec<_>>();
931        if !load_finished_info.is_empty() {
932            let task = task.get_or_insert_default();
933            task.load_finished_source_ids.extend(load_finished_info);
934        }
935
936        let refresh_finished_table_ids: Vec<JobId> = node
937            .state
938            .resps
939            .iter()
940            .flat_map(|resp| {
941                resp.refresh_finished_tables
942                    .iter()
943                    .map(|table_id| table_id.as_job_id())
944            })
945            .collect::<Vec<_>>();
946        if !refresh_finished_table_ids.is_empty() {
947            let task = task.get_or_insert_default();
948            task.refresh_finished_table_job_ids
949                .extend(refresh_finished_table_ids);
950        }
951    }
952}
953
954/// The state and message of this barrier, a node for concurrent checkpoint.
955struct EpochNode {
956    /// Timer for recording barrier latency, taken after `complete_barriers`.
957    enqueue_time: HistogramTimer,
958
959    /// Whether this barrier is in-flight or completed.
960    state: BarrierEpochState,
961
962    /// Context of this command to generate barrier and do some post jobs.
963    command_ctx: CommandContext,
964    /// Notifiers of this barrier.
965    notifiers: Vec<Notifier>,
966}
967
968#[derive(Debug)]
969/// The state of barrier.
970struct BarrierEpochState {
971    node_to_collect: NodeToCollect,
972
973    resps: Vec<BarrierCompleteResponse>,
974
975    creating_jobs_to_wait: HashSet<JobId>,
976
977    finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
978}
979
980impl BarrierEpochState {
981    fn is_inflight(&self) -> bool {
982        !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
983    }
984}
985
986impl DatabaseCheckpointControl {
987    /// Handle the new barrier from the scheduled queue and inject it.
988    fn handle_new_barrier(
989        &mut self,
990        command: Option<(Command, Vec<Notifier>)>,
991        checkpoint: bool,
992        span: tracing::Span,
993        control_stream_manager: &mut ControlStreamManager,
994        hummock_version_stats: &HummockVersionStats,
995    ) -> MetaResult<()> {
996        let curr_epoch = self.state.in_flight_prev_epoch().next();
997
998        let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
999            (Some(command), notifiers)
1000        } else {
1001            (None, vec![])
1002        };
1003
1004        for job_to_cancel in command
1005            .as_ref()
1006            .map(Command::jobs_to_drop)
1007            .into_iter()
1008            .flatten()
1009        {
1010            if self
1011                .creating_streaming_job_controls
1012                .contains_key(&job_to_cancel)
1013            {
1014                warn!(
1015                    job_id = %job_to_cancel,
1016                    "ignore cancel command on creating streaming job"
1017                );
1018                for notifier in notifiers {
1019                    notifier
1020                        .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
1021                }
1022                return Ok(());
1023            }
1024        }
1025
1026        if let Some(Command::RescheduleFragment { .. }) = &command
1027            && !self.creating_streaming_job_controls.is_empty()
1028        {
1029            warn!("ignore reschedule when creating streaming job with snapshot backfill");
1030            for notifier in notifiers {
1031                notifier.notify_start_failed(
1032                    anyhow!(
1033                            "cannot reschedule when creating streaming job with snapshot backfill",
1034                        )
1035                        .into(),
1036                );
1037            }
1038            return Ok(());
1039        }
1040
1041        let Some(barrier_info) =
1042            self.state
1043                .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1044        else {
1045            // skip the command when there is nothing to do with the barrier
1046            for mut notifier in notifiers {
1047                notifier.notify_started();
1048                notifier.notify_collected();
1049            }
1050            return Ok(());
1051        };
1052
1053        let mut edges = self
1054            .state
1055            .inflight_graph_info
1056            .build_edge(command.as_ref(), &*control_stream_manager);
1057
1058        // Insert newly added creating job
1059        if let Some(Command::CreateStreamingJob {
1060            job_type,
1061            info,
1062            cross_db_snapshot_backfill_info,
1063        }) = &mut command
1064        {
1065            match job_type {
1066                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1067                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1068                        fill_snapshot_backfill_epoch(
1069                            &mut fragment.nodes,
1070                            None,
1071                            cross_db_snapshot_backfill_info,
1072                        )?;
1073                    }
1074                }
1075                CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1076                    if self.state.is_paused() {
1077                        warn!("cannot create streaming job with snapshot backfill when paused");
1078                        for notifier in notifiers {
1079                            notifier.notify_start_failed(
1080                                anyhow!("cannot create streaming job with snapshot backfill when paused",)
1081                                    .into(),
1082                            );
1083                        }
1084                        return Ok(());
1085                    }
1086                    // set snapshot epoch of upstream table for snapshot backfill
1087                    for snapshot_backfill_epoch in snapshot_backfill_info
1088                        .upstream_mv_table_id_to_backfill_epoch
1089                        .values_mut()
1090                    {
1091                        assert_eq!(
1092                            snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1093                            None,
1094                            "must not set previously"
1095                        );
1096                    }
1097                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1098                        if let Err(e) = fill_snapshot_backfill_epoch(
1099                            &mut fragment.nodes,
1100                            Some(snapshot_backfill_info),
1101                            cross_db_snapshot_backfill_info,
1102                        ) {
1103                            warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1104                            for notifier in notifiers {
1105                                notifier.notify_start_failed(e.clone());
1106                            }
1107                            return Ok(());
1108                        };
1109                    }
1110                    let job_id = info.stream_job_fragments.stream_job_id();
1111                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
1112                        .upstream_mv_table_id_to_backfill_epoch
1113                        .keys()
1114                        .cloned()
1115                        .collect();
1116
1117                    let job = CreatingStreamingJobControl::new(
1118                        info,
1119                        snapshot_backfill_upstream_tables,
1120                        barrier_info.prev_epoch(),
1121                        hummock_version_stats,
1122                        control_stream_manager,
1123                        edges.as_mut().expect("should exist"),
1124                    )?;
1125
1126                    self.state.inflight_graph_info.shared_actor_infos.upsert(
1127                        self.database_id,
1128                        job.graph_info()
1129                            .values()
1130                            .map(|fragment| (fragment, job.job_id)),
1131                    );
1132
1133                    self.creating_streaming_job_controls.insert(job_id, job);
1134                }
1135            }
1136        }
1137
1138        // Collect the jobs to finish
1139        if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1140            if let Some(jobs_to_merge) = self.jobs_to_merge() {
1141                command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1142            } else {
1143                let pending_backfill_nodes =
1144                    self.state.inflight_graph_info.take_pending_backfill_nodes();
1145                if !pending_backfill_nodes.is_empty() {
1146                    command = Some(Command::StartFragmentBackfill {
1147                        fragment_ids: pending_backfill_nodes,
1148                    });
1149                }
1150            }
1151        }
1152
1153        let command = command;
1154
1155        let (
1156            pre_applied_graph_info,
1157            mv_subscription_max_retention,
1158            table_ids_to_commit,
1159            jobs_to_wait,
1160            prev_paused_reason,
1161        ) = self.state.apply_command(command.as_ref());
1162
1163        // Tracing related stuff
1164        barrier_info.prev_epoch.span().in_scope(|| {
1165            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1166        });
1167        span.record("epoch", barrier_info.curr_epoch.value().0);
1168
1169        for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1170            creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1171        }
1172
1173        let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1174            self.database_id,
1175            command.as_ref(),
1176            &barrier_info,
1177            prev_paused_reason,
1178            &pre_applied_graph_info,
1179            &self.state.inflight_graph_info,
1180            &mut edges,
1181        ) {
1182            Ok(node_to_collect) => node_to_collect,
1183            Err(err) => {
1184                for notifier in notifiers {
1185                    notifier.notify_start_failed(err.clone());
1186                }
1187                fail_point!("inject_barrier_err_success");
1188                return Err(err);
1189            }
1190        };
1191
1192        // Notify about the injection.
1193        notifiers.iter_mut().for_each(|n| n.notify_started());
1194
1195        let command_ctx = CommandContext::new(
1196            barrier_info,
1197            mv_subscription_max_retention,
1198            table_ids_to_commit,
1199            command,
1200            span,
1201        );
1202
1203        // Record the in-flight barrier.
1204        self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1205
1206        Ok(())
1207    }
1208}