risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::ops::Bound::{Excluded, Unbounded};
19use std::sync::atomic::AtomicU32;
20use std::task::Poll;
21
22use anyhow::anyhow;
23use fail::fail_point;
24use itertools::Itertools;
25use risingwave_common::catalog::{DatabaseId, TableId};
26use risingwave_common::id::JobId;
27use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
30use risingwave_meta_model::WorkerId;
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::hummock::HummockVersionStats;
33use risingwave_pb::id::{FragmentId, PartialGraphId};
34use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_service::BarrierCompleteResponse;
37use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
38use tracing::{debug, warn};
39
40use crate::barrier::cdc_progress::CdcProgress;
41use crate::barrier::checkpoint::independent_job::{
42    BatchRefreshJobTriggerContext, IndependentCheckpointJobControl,
43};
44use crate::barrier::checkpoint::recovery::{
45    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
46    RecoveringStateAction,
47};
48use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
49use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
50use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
51use crate::barrier::notifier::Notifier;
52use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphStat};
53use crate::barrier::progress::TrackingJob;
54use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
55use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
56use crate::barrier::utils::{BarrierItemCollector, collect_independent_job_commit_epoch_info};
57use crate::barrier::{
58    BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
59};
60use crate::controller::fragment::InflightFragmentInfo;
61use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
62use crate::manager::MetaSrvEnv;
63
64fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
65    let mut has_unreschedulable_scan = false;
66    visit_stream_node_cont(&fragment.nodes, |node| {
67        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
68            let scan_type = stream_scan.stream_scan_type();
69            if !scan_type.is_reschedulable(true) {
70                has_unreschedulable_scan = true;
71                return false;
72            }
73        }
74        true
75    });
76    has_unreschedulable_scan
77}
78
79fn collect_fragment_upstream_fragment_ids(
80    fragment: &InflightFragmentInfo,
81    upstream_fragment_ids: &mut HashSet<FragmentId>,
82) {
83    visit_stream_node_cont(&fragment.nodes, |node| {
84        if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
85            upstream_fragment_ids.insert(merge.upstream_fragment_id);
86        }
87        true
88    });
89}
90
91use crate::model::ActorId;
92use crate::rpc::metrics::GLOBAL_META_METRICS;
93use crate::{MetaError, MetaResult};
94
95pub(crate) struct CheckpointControl {
96    pub(crate) env: MetaSrvEnv,
97    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
98    pub(super) hummock_version_stats: HummockVersionStats,
99    /// The max barrier nums in flight
100    pub(crate) in_flight_barrier_nums: usize,
101}
102
103impl CheckpointControl {
104    pub fn new(env: MetaSrvEnv) -> Self {
105        Self {
106            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
107            env,
108            databases: Default::default(),
109            hummock_version_stats: Default::default(),
110        }
111    }
112
113    pub(crate) fn recover(
114        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
115        failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, /* `database_id` -> set of resetting partial graph ids */
116        hummock_version_stats: HummockVersionStats,
117        env: MetaSrvEnv,
118    ) -> Self {
119        env.shared_actor_infos()
120            .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
121        Self {
122            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
123            env,
124            databases: databases
125                .into_iter()
126                .map(|(database_id, control)| {
127                    (
128                        database_id,
129                        DatabaseCheckpointControlStatus::Running(control),
130                    )
131                })
132                .chain(failed_databases.into_iter().map(
133                    |(database_id, resetting_partial_graphs)| {
134                        (
135                            database_id,
136                            DatabaseCheckpointControlStatus::Recovering(
137                                DatabaseRecoveringState::new_resetting(
138                                    database_id,
139                                    resetting_partial_graphs,
140                                ),
141                            ),
142                        )
143                    },
144                ))
145                .collect(),
146            hummock_version_stats,
147        }
148    }
149
150    pub(crate) fn ack_completed(
151        &mut self,
152        partial_graph_manager: &mut PartialGraphManager,
153        output: BarrierCompleteOutput,
154    ) {
155        self.hummock_version_stats = output.hummock_version_stats;
156        for (database_id, (command_prev_epoch, independent_job_epochs)) in output.epochs_to_ack {
157            self.databases
158                .get_mut(&database_id)
159                .expect("should exist")
160                .expect_running("should have wait for completing command before enter recovery")
161                .ack_completed(
162                    partial_graph_manager,
163                    command_prev_epoch,
164                    independent_job_epochs,
165                );
166        }
167    }
168
169    pub(crate) fn next_complete_barrier_task(
170        &mut self,
171        periodic_barriers: &mut PeriodicBarriers,
172        partial_graph_manager: &mut PartialGraphManager,
173    ) -> Option<CompleteBarrierTask> {
174        let mut task = None;
175        for database in self.databases.values_mut() {
176            let Some(database) = database.running_state_mut() else {
177                continue;
178            };
179            database.next_complete_barrier_task(
180                periodic_barriers,
181                partial_graph_manager,
182                &mut task,
183                &self.hummock_version_stats,
184            );
185        }
186        task
187    }
188
189    pub(crate) fn barrier_collected(
190        &mut self,
191        partial_graph_id: PartialGraphId,
192        collected_barrier: CollectedBarrier<'_>,
193        periodic_barriers: &mut PeriodicBarriers,
194    ) -> MetaResult<()> {
195        let (database_id, _) = from_partial_graph_id(partial_graph_id);
196        let database_status = self.databases.get_mut(&database_id).expect("should exist");
197        match database_status {
198            DatabaseCheckpointControlStatus::Running(database) => {
199                database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
200            }
201            DatabaseCheckpointControlStatus::Recovering(_) => {
202                if cfg!(debug_assertions) {
203                    panic!(
204                        "receive collected barrier {:?} on recovering database {} from partial graph {}",
205                        collected_barrier, database_id, partial_graph_id
206                    );
207                } else {
208                    warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
209                }
210                Ok(())
211            }
212        }
213    }
214
215    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
216        self.databases.iter().filter_map(|(database_id, database)| {
217            database.running_state().is_none().then_some(*database_id)
218        })
219    }
220
221    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
222        self.databases.iter().filter_map(|(database_id, database)| {
223            database.running_state().is_some().then_some(*database_id)
224        })
225    }
226
227    pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
228        self.databases
229            .get(&database_id)
230            .and_then(|database| database.running_state())
231            .map(|database| &database.database_info)
232    }
233
234    pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
235        self.databases
236            .values()
237            .any(|database| database.may_have_snapshot_backfilling_jobs())
238    }
239
240    /// return Some(failed `database_id` -> `err`)
241    pub(crate) fn handle_new_barrier(
242        &mut self,
243        new_barrier: NewBarrier,
244        partial_graph_manager: &mut PartialGraphManager,
245        worker_nodes: &HashMap<WorkerId, WorkerNode>,
246    ) -> MetaResult<()> {
247        let NewBarrier {
248            database_id,
249            command,
250            span,
251            checkpoint,
252        } = new_barrier;
253
254        if let Some((mut command, notifiers)) = command {
255            if let &mut Command::CreateStreamingJob {
256                ref mut cross_db_snapshot_backfill_info,
257                ref info,
258                ..
259            } = &mut command
260            {
261                for (table_id, snapshot_epoch) in
262                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
263                {
264                    for database in self.databases.values() {
265                        if let Some(database) = database.running_state()
266                            && database.database_info.contains_job(table_id.as_job_id())
267                        {
268                            if let Some(committed_epoch) = database.committed_epoch {
269                                *snapshot_epoch = Some(committed_epoch);
270                            }
271                            break;
272                        }
273                    }
274                    if snapshot_epoch.is_none() {
275                        let table_id = *table_id;
276                        warn!(
277                            ?cross_db_snapshot_backfill_info,
278                            ?table_id,
279                            ?info,
280                            "database of cross db upstream table not found"
281                        );
282                        let err: MetaError =
283                            anyhow!("database of cross db upstream table {} not found", table_id)
284                                .into();
285                        for notifier in notifiers {
286                            notifier.notify_start_failed(err.clone());
287                        }
288
289                        return Ok(());
290                    }
291                }
292            }
293
294            let database = match self.databases.entry(database_id) {
295                Entry::Occupied(entry) => entry
296                    .into_mut()
297                    .expect_running("should not have command when not running"),
298                Entry::Vacant(entry) => match &command {
299                    Command::CreateStreamingJob { info, job_type, .. } => {
300                        let CreateStreamingJobType::Normal = job_type else {
301                            if cfg!(debug_assertions) {
302                                panic!(
303                                    "unexpected first job of type {job_type:?} with info {info:?}"
304                                );
305                            } else {
306                                for notifier in notifiers {
307                                    notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
308                                }
309                                return Ok(());
310                            }
311                        };
312                        let new_database = DatabaseCheckpointControl::new(
313                            database_id,
314                            self.env.shared_actor_infos().clone(),
315                        );
316                        let adder = partial_graph_manager.add_partial_graph(
317                            to_partial_graph_id(database_id, None),
318                            DatabaseCheckpointControlMetrics::new(database_id),
319                        );
320                        adder.added();
321                        entry
322                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
323                            .expect_running("just initialized as running")
324                    }
325                    Command::Flush
326                    | Command::Pause
327                    | Command::Resume
328                    | Command::DropStreamingJobs { .. }
329                    | Command::DropSubscription { .. } => {
330                        for mut notifier in notifiers {
331                            notifier.notify_started();
332                            notifier.notify_collected();
333                        }
334                        warn!(?command, "skip command for empty database");
335                        return Ok(());
336                    }
337                    Command::RescheduleIntent { .. }
338                    | Command::ReplaceStreamJob(_)
339                    | Command::SourceChangeSplit(_)
340                    | Command::Throttle { .. }
341                    | Command::CreateSubscription { .. }
342                    | Command::AlterSubscriptionRetention { .. }
343                    | Command::ConnectorPropsChange(_)
344                    | Command::Refresh { .. }
345                    | Command::ListFinish { .. }
346                    | Command::LoadFinish { .. }
347                    | Command::ResetSource { .. }
348                    | Command::ResumeBackfill { .. }
349                    | Command::InjectSourceOffsets { .. } => {
350                        if cfg!(debug_assertions) {
351                            panic!(
352                                "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
353                                database_id, command
354                            )
355                        } else {
356                            warn!(%database_id, ?command, "database not exist when handling command");
357                            for notifier in notifiers {
358                                notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
359                            }
360                            return Ok(());
361                        }
362                    }
363                },
364            };
365
366            database.handle_new_barrier(
367                Some((command, notifiers)),
368                checkpoint,
369                span,
370                partial_graph_manager,
371                &self.hummock_version_stats,
372                worker_nodes,
373            )
374        } else {
375            let database = match self.databases.entry(database_id) {
376                Entry::Occupied(entry) => entry.into_mut(),
377                Entry::Vacant(_) => {
378                    // If it does not exist in the HashMap yet, it means that the first streaming
379                    // job has not been created, and we do not need to send a barrier.
380                    return Ok(());
381                }
382            };
383            let Some(database) = database.running_state_mut() else {
384                // Skip new barrier for database which is not running.
385                return Ok(());
386            };
387            if partial_graph_manager.inflight_barrier_num(database.partial_graph_id)
388                >= self.in_flight_barrier_nums
389            {
390                // Skip new barrier with no explicit command when the database should pause inject additional barrier
391                return Ok(());
392            }
393            database.handle_new_barrier(
394                None,
395                checkpoint,
396                span,
397                partial_graph_manager,
398                &self.hummock_version_stats,
399                worker_nodes,
400            )
401        }
402    }
403
404    pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
405        let mut progress = HashMap::new();
406        for status in self.databases.values() {
407            let Some(database_checkpoint_control) = status.running_state() else {
408                continue;
409            };
410            // Progress of normal backfill
411            progress.extend(
412                database_checkpoint_control
413                    .database_info
414                    .gen_backfill_progress(),
415            );
416            // Progress of independent checkpoint jobs
417            for (job_id, job) in &database_checkpoint_control.independent_checkpoint_job_controls {
418                if let Some(p) = job.gen_backfill_progress() {
419                    progress.insert(*job_id, p);
420                }
421            }
422        }
423        progress
424    }
425
426    pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
427        let mut progress = Vec::new();
428        for status in self.databases.values() {
429            let Some(database_checkpoint_control) = status.running_state() else {
430                continue;
431            };
432            progress.extend(
433                database_checkpoint_control
434                    .database_info
435                    .gen_fragment_backfill_progress(),
436            );
437            for job in database_checkpoint_control
438                .independent_checkpoint_job_controls
439                .values()
440            {
441                progress.extend(job.gen_fragment_backfill_progress());
442            }
443        }
444        progress
445    }
446
447    pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
448        let mut progress = HashMap::new();
449        for status in self.databases.values() {
450            let Some(database_checkpoint_control) = status.running_state() else {
451                continue;
452            };
453            // Progress of normal backfill
454            progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
455        }
456        progress
457    }
458
459    pub(crate) fn databases_failed_at_worker_err(
460        &mut self,
461        worker_id: WorkerId,
462    ) -> impl Iterator<Item = DatabaseId> + '_ {
463        self.databases
464            .iter_mut()
465            .filter_map(
466                move |(database_id, database_status)| match database_status {
467                    DatabaseCheckpointControlStatus::Running(control) => {
468                        if !control.is_valid_after_worker_err(worker_id) {
469                            Some(*database_id)
470                        } else {
471                            None
472                        }
473                    }
474                    DatabaseCheckpointControlStatus::Recovering(state) => {
475                        if !state.is_valid_after_worker_err(worker_id) {
476                            Some(*database_id)
477                        } else {
478                            None
479                        }
480                    }
481                },
482            )
483    }
484
485    // ── Batch refresh trigger helpers (delegating to DatabaseCheckpointControl) ──
486
487    pub(crate) fn get_batch_refresh_trigger_info(
488        &self,
489        database_id: DatabaseId,
490        job_id: JobId,
491    ) -> u64 {
492        let database = self
493            .databases
494            .get(&database_id)
495            .and_then(|s| s.running_state())
496            .expect("database should be running for batch refresh trigger");
497        database.get_batch_refresh_trigger_info(job_id)
498    }
499
500    pub(crate) fn start_batch_refresh_run(
501        &mut self,
502        database_id: DatabaseId,
503        job_id: JobId,
504        context: &BatchRefreshJobTriggerContext,
505        worker_nodes: &HashMap<WorkerId, WorkerNode>,
506        actor_id_counter: &AtomicU32,
507        partial_graph_manager: &mut PartialGraphManager,
508    ) -> MetaResult<bool> {
509        let database = self
510            .databases
511            .get_mut(&database_id)
512            .and_then(|s| s.running_state_mut())
513            .expect("database should be running");
514        database.start_batch_refresh_run(
515            job_id,
516            context,
517            worker_nodes,
518            actor_id_counter,
519            partial_graph_manager,
520        )
521    }
522
523    pub(crate) fn apply_batch_refresh_fragment_infos(
524        &mut self,
525        database_id: DatabaseId,
526        job_id: JobId,
527    ) {
528        let database = self
529            .databases
530            .get_mut(&database_id)
531            .and_then(|s| s.running_state_mut())
532            .expect("database should be running");
533        let br_job = match database
534            .independent_checkpoint_job_controls
535            .get(&job_id)
536            .expect("job should exist")
537        {
538            IndependentCheckpointJobControl::BatchRefresh(job) => job,
539            _ => panic!("expected batch refresh job"),
540        };
541        if let Some(fragment_infos) = br_job.fragment_infos() {
542            database
543                .database_info
544                .shared_actor_infos
545                .upsert(database_id, fragment_infos.values().map(|f| (f, job_id)));
546        }
547    }
548}
549
550pub(crate) enum CheckpointControlEvent<'a> {
551    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
552    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
553    /// A batch refresh job is idle and its upstream has advanced past the refresh interval.
554    /// Carries owned values so the async handler can call into context without borrowing self.
555    BatchRefreshTrigger {
556        database_id: DatabaseId,
557        job_id: JobId,
558    },
559}
560
561impl CheckpointControl {
562    pub(crate) fn on_partial_graph_reset(
563        &mut self,
564        partial_graph_id: PartialGraphId,
565        reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
566    ) {
567        let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
568        match self.databases.get_mut(&database_id).expect("should exist") {
569            DatabaseCheckpointControlStatus::Running(database) => {
570                if let Some(independent_job_id) = independent_job_id {
571                    match database
572                        .independent_checkpoint_job_controls
573                        .remove(&independent_job_id)
574                    {
575                        Some(independent_job) => {
576                            independent_job.on_partial_graph_reset();
577                        }
578                        None => {
579                            if cfg!(debug_assertions) {
580                                panic!(
581                                    "receive reset partial graph resp on non-existing independent job {independent_job_id} in database {database_id}"
582                                )
583                            }
584                            warn!(
585                                %database_id,
586                                %independent_job_id,
587                                "ignore reset partial graph resp on non-existing independent job on running database"
588                            );
589                        }
590                    }
591                } else {
592                    unreachable!("should not receive reset database resp when database running")
593                }
594            }
595            DatabaseCheckpointControlStatus::Recovering(state) => {
596                state.on_partial_graph_reset(partial_graph_id, reset_resps);
597            }
598        }
599    }
600
601    pub(crate) fn on_partial_graph_initialized(
602        &mut self,
603        partial_graph_id: PartialGraphId,
604        partial_graph_manager: &mut PartialGraphManager,
605    ) -> MetaResult<()> {
606        let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
607        match self.databases.get_mut(&database_id).expect("should exist") {
608            DatabaseCheckpointControlStatus::Running(database) => {
609                let Some(independent_job_id) = independent_job_id else {
610                    unreachable!("database partial graph should not initialize when running")
611                };
612                let job = database
613                    .independent_checkpoint_job_controls
614                    .get_mut(&independent_job_id)
615                    .expect("independent job should exist");
616                match job {
617                    IndependentCheckpointJobControl::BatchRefresh(job) => {
618                        job.on_log_store_initialized(partial_graph_manager)
619                    }
620                    IndependentCheckpointJobControl::CreatingStreamingJob(_) => {
621                        unreachable!("creating streaming job should not initialize when running")
622                    }
623                }
624            }
625            DatabaseCheckpointControlStatus::Recovering(state) => {
626                state.partial_graph_initialized(partial_graph_id);
627                Ok(())
628            }
629        }
630    }
631
632    pub(crate) fn next_event(
633        &mut self,
634    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
635        let mut this = Some(self);
636        poll_fn(move |cx| {
637            let Some(this_mut) = this.as_mut() else {
638                unreachable!("should not be polled after poll ready")
639            };
640            for (&database_id, database_status) in &mut this_mut.databases {
641                match database_status {
642                    DatabaseCheckpointControlStatus::Running(database) => {
643                        // Check if any idle batch refresh job should start a refresh run.
644                        if let Some(committed_epoch) = database.committed_epoch {
645                            for (job_id, job) in &database.independent_checkpoint_job_controls {
646                                if let IndependentCheckpointJobControl::BatchRefresh(br_job) = job
647                                    && br_job.should_start_refresh(committed_epoch)
648                                {
649                                    let job_id = *job_id;
650                                    let _ = this.take().expect("checked Some");
651                                    return Poll::Ready(
652                                        CheckpointControlEvent::BatchRefreshTrigger {
653                                            database_id,
654                                            job_id,
655                                        },
656                                    );
657                                }
658                            }
659                        }
660                    }
661                    DatabaseCheckpointControlStatus::Recovering(state) => {
662                        let poll_result = state.poll_next_event(cx);
663                        if let Poll::Ready(action) = poll_result {
664                            let this = this.take().expect("checked Some");
665                            return Poll::Ready(match action {
666                                RecoveringStateAction::EnterInitializing(reset_workers) => {
667                                    CheckpointControlEvent::EnteringInitializing(
668                                        this.new_database_status_action(
669                                            database_id,
670                                            EnterInitializing(reset_workers),
671                                        ),
672                                    )
673                                }
674                                RecoveringStateAction::EnterRunning => {
675                                    CheckpointControlEvent::EnteringRunning(
676                                        this.new_database_status_action(database_id, EnterRunning),
677                                    )
678                                }
679                            });
680                        }
681                    }
682                }
683            }
684            Poll::Pending
685        })
686    }
687}
688
689pub(crate) enum DatabaseCheckpointControlStatus {
690    Running(DatabaseCheckpointControl),
691    Recovering(DatabaseRecoveringState),
692}
693
694impl DatabaseCheckpointControlStatus {
695    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
696        match self {
697            DatabaseCheckpointControlStatus::Running(state) => Some(state),
698            DatabaseCheckpointControlStatus::Recovering(_) => None,
699        }
700    }
701
702    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
703        match self {
704            DatabaseCheckpointControlStatus::Running(state) => Some(state),
705            DatabaseCheckpointControlStatus::Recovering(_) => None,
706        }
707    }
708
709    fn may_have_snapshot_backfilling_jobs(&self) -> bool {
710        self.running_state()
711            .map(|database| {
712                database
713                    .independent_checkpoint_job_controls
714                    .values()
715                    .any(|job| job.is_snapshot_backfilling())
716            })
717            .unwrap_or(true) // there can be snapshot backfilling jobs when the database is recovering.
718    }
719
720    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
721        match self {
722            DatabaseCheckpointControlStatus::Running(state) => state,
723            DatabaseCheckpointControlStatus::Recovering(_) => {
724                panic!("should be at running: {}", reason)
725            }
726        }
727    }
728}
729
730pub(in crate::barrier) struct DatabaseCheckpointControlMetrics {
731    barrier_latency: LabelGuardedHistogram,
732    in_flight_barrier_nums: LabelGuardedIntGauge,
733    all_barrier_nums: LabelGuardedIntGauge,
734}
735
736impl DatabaseCheckpointControlMetrics {
737    pub(in crate::barrier) fn new(database_id: DatabaseId) -> Self {
738        let database_id_str = database_id.to_string();
739        let barrier_latency = GLOBAL_META_METRICS
740            .barrier_latency
741            .with_guarded_label_values(&[&database_id_str]);
742        let in_flight_barrier_nums = GLOBAL_META_METRICS
743            .in_flight_barrier_nums
744            .with_guarded_label_values(&[&database_id_str]);
745        let all_barrier_nums = GLOBAL_META_METRICS
746            .all_barrier_nums
747            .with_guarded_label_values(&[&database_id_str]);
748        Self {
749            barrier_latency,
750            in_flight_barrier_nums,
751            all_barrier_nums,
752        }
753    }
754}
755
756impl PartialGraphStat for DatabaseCheckpointControlMetrics {
757    fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
758        self.barrier_latency.observe(barrier_latency_secs);
759    }
760
761    fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize) {
762        self.in_flight_barrier_nums.set(inflight_barrier_num as _);
763        self.all_barrier_nums
764            .set((inflight_barrier_num + collected_barrier_num) as _);
765    }
766}
767
768/// Controls the concurrent execution of commands.
769pub(in crate::barrier) struct DatabaseCheckpointControl {
770    pub(super) database_id: DatabaseId,
771    partial_graph_id: PartialGraphId,
772    pub(super) state: BarrierWorkerState,
773
774    finishing_jobs_collector:
775        BarrierItemCollector<JobId, (Vec<BarrierCompleteResponse>, TrackingJob), ()>,
776    /// The barrier that are completing.
777    /// Some(`prev_epoch`)
778    completing_barrier: Option<u64>,
779
780    committed_epoch: Option<u64>,
781
782    pub(super) database_info: InflightDatabaseInfo,
783    pub independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
784}
785
786impl DatabaseCheckpointControl {
787    fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
788        Self {
789            database_id,
790            partial_graph_id: to_partial_graph_id(database_id, None),
791            state: BarrierWorkerState::new(),
792            finishing_jobs_collector: BarrierItemCollector::new(),
793            completing_barrier: None,
794            committed_epoch: None,
795            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
796            independent_checkpoint_job_controls: Default::default(),
797        }
798    }
799
800    pub(crate) fn recovery(
801        database_id: DatabaseId,
802        state: BarrierWorkerState,
803        committed_epoch: u64,
804        database_info: InflightDatabaseInfo,
805        independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
806    ) -> Self {
807        Self {
808            database_id,
809            partial_graph_id: to_partial_graph_id(database_id, None),
810            state,
811            finishing_jobs_collector: BarrierItemCollector::new(),
812            completing_barrier: None,
813            committed_epoch: Some(committed_epoch),
814            database_info,
815            independent_checkpoint_job_controls,
816        }
817    }
818
819    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
820        !self.database_info.contains_worker(worker_id as _)
821            && self
822                .independent_checkpoint_job_controls
823                .values()
824                .all(|job| {
825                    job.fragment_infos()
826                        .map(|fragment_infos| {
827                            !InflightFragmentInfo::contains_worker(
828                                fragment_infos.values(),
829                                worker_id,
830                            )
831                        })
832                        .unwrap_or(true)
833                })
834    }
835
836    /// Enqueue a barrier command
837    fn enqueue_command(&mut self, epoch: EpochPair, independent_jobs_to_wait: HashSet<JobId>) {
838        let prev_epoch = epoch.prev;
839        tracing::trace!(prev_epoch, ?independent_jobs_to_wait, "enqueue command");
840        if !independent_jobs_to_wait.is_empty() {
841            self.finishing_jobs_collector
842                .enqueue(epoch, independent_jobs_to_wait, ());
843        }
844    }
845
846    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
847    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
848    fn barrier_collected(
849        &mut self,
850        partial_graph_id: PartialGraphId,
851        collected_barrier: CollectedBarrier<'_>,
852        periodic_barriers: &mut PeriodicBarriers,
853    ) -> MetaResult<()> {
854        let prev_epoch = collected_barrier.epoch.prev;
855        tracing::trace!(
856            prev_epoch,
857            partial_graph_id = %partial_graph_id,
858            "barrier collected"
859        );
860        let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
861        assert_eq!(self.database_id, database_id);
862        if let Some(independent_job_id) = independent_job_id {
863            let job = self
864                .independent_checkpoint_job_controls
865                .get_mut(&independent_job_id)
866                .expect("should exist");
867            let should_force_checkpoint = job.collect(collected_barrier);
868            if should_force_checkpoint {
869                periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
870            }
871        }
872        Ok(())
873    }
874}
875
876impl DatabaseCheckpointControl {
877    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
878    fn collect_backfill_pinned_upstream_log_epoch(
879        &self,
880    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
881        self.independent_checkpoint_job_controls
882            .iter()
883            .map(|(job_id, job)| (*job_id, job.pinned_upstream_log_epoch()))
884            .collect()
885    }
886
887    fn collect_no_shuffle_fragment_relations_for_reschedule_check(
888        &self,
889    ) -> Vec<(FragmentId, FragmentId)> {
890        let mut no_shuffle_relations = Vec::new();
891        for fragment in self.database_info.fragment_infos() {
892            let downstream_fragment_id = fragment.fragment_id;
893            visit_stream_node_cont(&fragment.nodes, |node| {
894                if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
895                    && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
896                {
897                    no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
898                }
899                true
900            });
901        }
902
903        for job in self.independent_checkpoint_job_controls.values() {
904            if let Some(fragment_infos) = job.fragment_infos() {
905                for fragment_info in fragment_infos.values() {
906                    let downstream_fragment_id = fragment_info.fragment_id;
907                    visit_stream_node_cont(&fragment_info.nodes, |node| {
908                        if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
909                            && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
910                        {
911                            no_shuffle_relations
912                                .push((merge.upstream_fragment_id, downstream_fragment_id));
913                        }
914                        true
915                    });
916                }
917            }
918        }
919        no_shuffle_relations
920    }
921
922    fn collect_reschedule_blocked_jobs_for_independent_jobs_inflight(
923        &self,
924    ) -> MetaResult<HashSet<JobId>> {
925        let mut initial_blocked_fragment_ids = HashSet::new();
926        for job in self.independent_checkpoint_job_controls.values() {
927            if let Some(fragment_infos) = job.fragment_infos() {
928                for fragment_info in fragment_infos.values() {
929                    if fragment_has_online_unreschedulable_scan(fragment_info) {
930                        initial_blocked_fragment_ids.insert(fragment_info.fragment_id);
931                        collect_fragment_upstream_fragment_ids(
932                            fragment_info,
933                            &mut initial_blocked_fragment_ids,
934                        );
935                    }
936                }
937            }
938        }
939
940        let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
941        if !initial_blocked_fragment_ids.is_empty() {
942            let no_shuffle_relations =
943                self.collect_no_shuffle_fragment_relations_for_reschedule_check();
944            let (forward_edges, backward_edges) =
945                build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
946            let initial_blocked_fragment_ids: Vec<_> =
947                initial_blocked_fragment_ids.iter().copied().collect();
948            for ensemble in find_no_shuffle_graphs(
949                &initial_blocked_fragment_ids,
950                &forward_edges,
951                &backward_edges,
952            )? {
953                blocked_fragment_ids.extend(ensemble.fragments());
954            }
955        }
956
957        let mut blocked_job_ids = HashSet::new();
958        blocked_job_ids.extend(
959            blocked_fragment_ids
960                .into_iter()
961                .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
962        );
963        Ok(blocked_job_ids)
964    }
965
966    fn collect_reschedule_blocked_job_ids(
967        &self,
968        reschedules: &HashMap<FragmentId, Reschedule>,
969        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
970        blocked_job_ids: &HashSet<JobId>,
971    ) -> HashSet<JobId> {
972        let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
973        affected_fragment_ids.extend(fragment_actors.keys().copied());
974        for reschedule in reschedules.values() {
975            affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
976            affected_fragment_ids.extend(
977                reschedule
978                    .upstream_fragment_dispatcher_ids
979                    .iter()
980                    .map(|(fragment_id, _)| *fragment_id),
981            );
982        }
983
984        affected_fragment_ids
985            .into_iter()
986            .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
987            .filter(|job_id| blocked_job_ids.contains(job_id))
988            .collect()
989    }
990
991    fn next_complete_barrier_task(
992        &mut self,
993        periodic_barriers: &mut PeriodicBarriers,
994        partial_graph_manager: &mut PartialGraphManager,
995        task: &mut Option<CompleteBarrierTask>,
996        hummock_version_stats: &HummockVersionStats,
997    ) {
998        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
999        let mut independent_jobs_task = vec![];
1000        if let Some(committed_epoch) = self.committed_epoch {
1001            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
1002            let mut finished_jobs = Vec::new();
1003            let min_upstream_inflight_barrier = partial_graph_manager
1004                .first_inflight_barrier(self.partial_graph_id)
1005                .map(|epoch| epoch.prev);
1006            for (job_id, job) in &mut self.independent_checkpoint_job_controls {
1007                match job {
1008                    IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) => {
1009                        if let Some((epoch, resps, info, is_finish_epoch)) = creating_job
1010                            .start_completing(
1011                                partial_graph_manager,
1012                                min_upstream_inflight_barrier,
1013                                committed_epoch,
1014                            )
1015                        {
1016                            let resps = resps.into_values().collect_vec();
1017                            if is_finish_epoch {
1018                                assert!(info.notifiers.is_empty());
1019                                finished_jobs.push((*job_id, epoch, resps));
1020                                continue;
1021                            };
1022                            independent_jobs_task.push((*job_id, epoch, resps, info));
1023                        }
1024                    }
1025                    IndependentCheckpointJobControl::BatchRefresh(batch_refresh_job) => {
1026                        if let Some((epoch, resps, info, tracking_job)) =
1027                            batch_refresh_job.start_completing(partial_graph_manager)
1028                        {
1029                            let resps = resps.into_values().collect_vec();
1030                            if let Some(tracking_job) = tracking_job {
1031                                let task = task.get_or_insert_default();
1032                                task.finished_jobs.push(tracking_job);
1033                            }
1034                            independent_jobs_task.push((*job_id, epoch, resps, info));
1035                        }
1036                    }
1037                }
1038            }
1039            if !finished_jobs.is_empty() {
1040                partial_graph_manager.remove_partial_graphs(
1041                    finished_jobs
1042                        .iter()
1043                        .map(|(job_id, ..)| to_partial_graph_id(self.database_id, Some(*job_id)))
1044                        .collect(),
1045                );
1046            }
1047            for (job_id, epoch, resps) in finished_jobs {
1048                debug!(epoch, %job_id, "finish creating job");
1049                // It's safe to remove the creating job, because on CompleteJobType::Finished,
1050                // all previous barriers have been collected and completed.
1051                let Some(IndependentCheckpointJobControl::CreatingStreamingJob(
1052                    creating_streaming_job,
1053                )) = self.independent_checkpoint_job_controls.remove(&job_id)
1054                else {
1055                    panic!("finished job {job_id} should be a creating streaming job");
1056                };
1057                let tracking_job = creating_streaming_job.into_tracking_job();
1058                self.finishing_jobs_collector
1059                    .collect(epoch, job_id, (resps, tracking_job));
1060            }
1061        }
1062        let mut observed_non_checkpoint = false;
1063        self.finishing_jobs_collector.advance_collected();
1064        let epoch_end_bound = self
1065            .finishing_jobs_collector
1066            .first_inflight_epoch()
1067            .map_or(Unbounded, |epoch| Excluded(epoch.prev));
1068        if let Some((epoch, resps, info)) = partial_graph_manager.start_completing(
1069            self.partial_graph_id,
1070            epoch_end_bound,
1071            |_, resps, post_collect_command| {
1072                observed_non_checkpoint = true;
1073                self.handle_refresh_table_info(task, &resps);
1074                self.database_info.apply_collected_command(
1075                    &post_collect_command,
1076                    &resps,
1077                    hummock_version_stats,
1078                );
1079            },
1080        ) {
1081            self.handle_refresh_table_info(task, &resps);
1082            self.database_info.apply_collected_command(
1083                &info.post_collect_command,
1084                &resps,
1085                hummock_version_stats,
1086            );
1087            let mut resps_to_commit = resps.into_values().collect_vec();
1088            let mut staging_commit_info = self.database_info.take_staging_commit_info();
1089            if let Some((_, finished_jobs, _)) =
1090                self.finishing_jobs_collector
1091                    .take_collected_if(|collected_epoch| {
1092                        assert!(epoch <= collected_epoch.prev);
1093                        epoch == collected_epoch.prev
1094                    })
1095            {
1096                finished_jobs
1097                    .into_iter()
1098                    .for_each(|(_, (resps, tracking_job))| {
1099                        resps_to_commit.extend(resps);
1100                        staging_commit_info.finished_jobs.push(tracking_job);
1101                    });
1102            }
1103            {
1104                let task = task.get_or_insert_default();
1105                Command::collect_commit_epoch_info(
1106                    &self.database_info,
1107                    &info,
1108                    &mut task.commit_info,
1109                    resps_to_commit,
1110                    self.collect_backfill_pinned_upstream_log_epoch(),
1111                );
1112                self.completing_barrier = Some(info.barrier_info.prev_epoch());
1113                task.finished_jobs.extend(staging_commit_info.finished_jobs);
1114                task.finished_cdc_table_backfill
1115                    .extend(staging_commit_info.finished_cdc_table_backfill);
1116                task.epoch_infos
1117                    .try_insert(self.partial_graph_id, info)
1118                    .expect("non duplicate");
1119                task.commit_info
1120                    .truncate_tables
1121                    .extend(staging_commit_info.table_ids_to_truncate);
1122            }
1123        } else if observed_non_checkpoint
1124            && self.database_info.has_pending_finished_jobs()
1125            && !partial_graph_manager.has_pending_checkpoint_barrier(self.partial_graph_id)
1126        {
1127            periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
1128        }
1129        if !independent_jobs_task.is_empty() {
1130            let task = task.get_or_insert_default();
1131            for (job_id, epoch, resps, info) in independent_jobs_task {
1132                collect_independent_job_commit_epoch_info(
1133                    &mut task.commit_info,
1134                    epoch,
1135                    resps,
1136                    &info,
1137                );
1138                task.epoch_infos
1139                    .try_insert(to_partial_graph_id(self.database_id, Some(job_id)), info)
1140                    .expect("non duplicate");
1141            }
1142        }
1143    }
1144
1145    fn ack_completed(
1146        &mut self,
1147        partial_graph_manager: &mut PartialGraphManager,
1148        command_prev_epoch: Option<u64>,
1149        independent_job_epochs: Vec<(JobId, u64)>,
1150    ) {
1151        {
1152            if let Some(prev_epoch) = self.completing_barrier.take() {
1153                assert_eq!(command_prev_epoch, Some(prev_epoch));
1154                self.committed_epoch = Some(prev_epoch);
1155                partial_graph_manager.ack_completed(self.partial_graph_id, prev_epoch);
1156            } else {
1157                assert_eq!(command_prev_epoch, None);
1158            };
1159            for (job_id, epoch) in independent_job_epochs {
1160                if let Some(job) = self.independent_checkpoint_job_controls.get_mut(&job_id) {
1161                    job.ack_completed(partial_graph_manager, epoch);
1162                }
1163                // If the job is not found, it was dropped and already removed
1164                // by `on_partial_graph_reset` while the completing task was running.
1165            }
1166        }
1167    }
1168
1169    fn handle_refresh_table_info(
1170        &self,
1171        task: &mut Option<CompleteBarrierTask>,
1172        resps: &HashMap<WorkerId, BarrierCompleteResponse>,
1173    ) {
1174        let list_finished_info = resps
1175            .values()
1176            .flat_map(|resp| resp.list_finished_sources.clone())
1177            .collect::<Vec<_>>();
1178        if !list_finished_info.is_empty() {
1179            let task = task.get_or_insert_default();
1180            task.list_finished_source_ids.extend(list_finished_info);
1181        }
1182
1183        let load_finished_info = resps
1184            .values()
1185            .flat_map(|resp| resp.load_finished_sources.clone())
1186            .collect::<Vec<_>>();
1187        if !load_finished_info.is_empty() {
1188            let task = task.get_or_insert_default();
1189            task.load_finished_source_ids.extend(load_finished_info);
1190        }
1191
1192        let refresh_finished_table_ids: Vec<JobId> = resps
1193            .values()
1194            .flat_map(|resp| {
1195                resp.refresh_finished_tables
1196                    .iter()
1197                    .map(|table_id| table_id.as_job_id())
1198            })
1199            .collect::<Vec<_>>();
1200        if !refresh_finished_table_ids.is_empty() {
1201            let task = task.get_or_insert_default();
1202            task.refresh_finished_table_job_ids
1203                .extend(refresh_finished_table_ids);
1204        }
1205    }
1206}
1207
1208impl DatabaseCheckpointControl {
1209    /// Handle the new barrier from the scheduled queue and inject it.
1210    fn handle_new_barrier(
1211        &mut self,
1212        command: Option<(Command, Vec<Notifier>)>,
1213        checkpoint: bool,
1214        span: tracing::Span,
1215        partial_graph_manager: &mut PartialGraphManager,
1216        hummock_version_stats: &HummockVersionStats,
1217        worker_nodes: &HashMap<WorkerId, WorkerNode>,
1218    ) -> MetaResult<()> {
1219        let curr_epoch = self.state.in_flight_prev_epoch().next();
1220
1221        let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1222            (Some(command), notifiers)
1223        } else {
1224            (None, vec![])
1225        };
1226
1227        debug_assert!(
1228            !matches!(
1229                &command,
1230                Some(Command::RescheduleIntent {
1231                    reschedule_plan: None,
1232                    ..
1233                })
1234            ),
1235            "reschedule intent should be resolved before injection"
1236        );
1237
1238        if let Some(Command::DropStreamingJobs {
1239            streaming_job_ids, ..
1240        }) = &command
1241        {
1242            if streaming_job_ids.len() > 1 {
1243                for job_to_cancel in streaming_job_ids {
1244                    if self
1245                        .independent_checkpoint_job_controls
1246                        .contains_key(job_to_cancel)
1247                    {
1248                        warn!(
1249                            job_id = %job_to_cancel,
1250                            "ignore multi-job cancel command on creating snapshot backfill streaming job"
1251                        );
1252                        for notifier in notifiers {
1253                            notifier
1254                                .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1255                                the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1256                        }
1257                        return Ok(());
1258                    }
1259                }
1260            } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1261                && let Some(job) = self
1262                    .independent_checkpoint_job_controls
1263                    .get_mut(job_to_drop)
1264            {
1265                let dropped = job.drop(&mut notifiers, partial_graph_manager);
1266                if dropped {
1267                    return Ok(());
1268                }
1269            }
1270        }
1271
1272        if let Some(Command::Throttle { jobs, .. }) = &command
1273            && jobs.len() > 1
1274            && let Some(independent_job_id) = jobs
1275                .iter()
1276                .find(|job| self.independent_checkpoint_job_controls.contains_key(*job))
1277        {
1278            warn!(
1279                job_id = %independent_job_id,
1280                "ignore multi-job throttle command on independent checkpoint job"
1281            );
1282            for notifier in notifiers {
1283                notifier.notify_start_failed(
1284                    anyhow!(
1285                        "cannot alter rate limit for independent checkpoint job with other jobs, \
1286                                the original rate limit will be kept during recovery."
1287                    )
1288                    .into(),
1289                );
1290            }
1291            return Ok(());
1292        };
1293
1294        if let Some(Command::RescheduleIntent {
1295            reschedule_plan: Some(reschedule_plan),
1296            ..
1297        }) = &command
1298            && !self.independent_checkpoint_job_controls.is_empty()
1299        {
1300            let blocked_job_ids =
1301                self.collect_reschedule_blocked_jobs_for_independent_jobs_inflight()?;
1302            let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1303                &reschedule_plan.reschedules,
1304                &reschedule_plan.fragment_actors,
1305                &blocked_job_ids,
1306            );
1307            if !blocked_reschedule_job_ids.is_empty() {
1308                warn!(
1309                    blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1310                    "reject reschedule fragments related to creating unreschedulable backfill jobs"
1311                );
1312                for notifier in notifiers {
1313                    notifier.notify_start_failed(
1314                        anyhow!(
1315                            "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1316                            blocked_reschedule_job_ids
1317                        )
1318                            .into(),
1319                    );
1320                }
1321                return Ok(());
1322            }
1323        }
1324
1325        if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1326            && self.database_info.is_empty()
1327        {
1328            assert!(
1329                self.independent_checkpoint_job_controls.is_empty(),
1330                "should not have snapshot backfill job when there is no normal job in database"
1331            );
1332            // skip the command when there is nothing to do with the barrier
1333            for mut notifier in notifiers {
1334                notifier.notify_started();
1335                notifier.notify_collected();
1336            }
1337            return Ok(());
1338        };
1339
1340        if let Some(Command::CreateStreamingJob {
1341            job_type:
1342                CreateStreamingJobType::SnapshotBackfill(_) | CreateStreamingJobType::BatchRefresh(_),
1343            ..
1344        }) = &command
1345            && self.state.is_paused()
1346        {
1347            warn!("cannot create streaming job with snapshot backfill when paused");
1348            for notifier in notifiers {
1349                notifier.notify_start_failed(
1350                    anyhow!("cannot create streaming job with snapshot backfill when paused",)
1351                        .into(),
1352                );
1353            }
1354            return Ok(());
1355        }
1356
1357        let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1358        // Tracing related stuff
1359        barrier_info.prev_epoch.span().in_scope(|| {
1360            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1361        });
1362        span.record("epoch", barrier_info.curr_epoch());
1363
1364        let epoch = barrier_info.epoch();
1365        let ApplyCommandInfo { jobs_to_wait } = match self.apply_command(
1366            command,
1367            &mut notifiers,
1368            barrier_info,
1369            partial_graph_manager,
1370            hummock_version_stats,
1371            worker_nodes,
1372        ) {
1373            Ok(info) => {
1374                assert!(notifiers.is_empty());
1375                info
1376            }
1377            Err(err) => {
1378                for notifier in notifiers {
1379                    notifier.notify_start_failed(err.clone());
1380                }
1381                fail_point!("inject_barrier_err_success");
1382                return Err(err);
1383            }
1384        };
1385
1386        // Record the in-flight barrier.
1387        self.enqueue_command(epoch, jobs_to_wait);
1388
1389        Ok(())
1390    }
1391
1392    // ── Batch refresh trigger helpers ────────────────────────────────────────
1393
1394    /// Get the last committed epoch for a batch refresh job.
1395    pub(crate) fn get_batch_refresh_trigger_info(&self, job_id: JobId) -> u64 {
1396        let job = self
1397            .independent_checkpoint_job_controls
1398            .get(&job_id)
1399            .expect("batch refresh job should exist");
1400        match job {
1401            IndependentCheckpointJobControl::BatchRefresh(br_job) => br_job
1402                .last_committed_epoch()
1403                .expect("idle job must have a last_committed_epoch"),
1404            _ => panic!("job {} should be a batch refresh job", job_id),
1405        }
1406    }
1407
1408    /// Whether the batch refresh job already has its cached context populated.
1409    /// Start a batch refresh logstore consumption run.
1410    /// Returns true if a run was started, false if no log epochs to consume.
1411    pub(crate) fn start_batch_refresh_run(
1412        &mut self,
1413        job_id: JobId,
1414        context: &BatchRefreshJobTriggerContext,
1415        worker_nodes: &HashMap<WorkerId, WorkerNode>,
1416        actor_id_counter: &AtomicU32,
1417        partial_graph_manager: &mut PartialGraphManager,
1418    ) -> MetaResult<bool> {
1419        let job = self
1420            .independent_checkpoint_job_controls
1421            .get_mut(&job_id)
1422            .expect("batch refresh job should exist");
1423        match job {
1424            IndependentCheckpointJobControl::BatchRefresh(br_job) => br_job.start_refresh_run(
1425                context,
1426                worker_nodes,
1427                actor_id_counter,
1428                partial_graph_manager,
1429            ),
1430            _ => panic!("job {} should be a batch refresh job", job_id),
1431        }
1432    }
1433}