risingwave_meta/barrier/checkpoint/independent_job/batch_refresh_job/
mod.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batch refresh job checkpoint control for snapshot-only materialized views.
16//!
17//! It lives permanently in `DatabaseCheckpointControl.independent_checkpoint_job_controls`
18//! as an `IndependentCheckpointJobControl::BatchRefresh` variant for its entire lifetime.
19//!
20//! Lifecycle for first run (snapshot only):
21//!   DDL → `ConsumingSnapshot` → stop actors committed → `Idle`
22
23use std::collections::{HashMap, HashSet};
24use std::mem::{replace, take};
25use std::sync::atomic::AtomicU32;
26
27use anyhow::anyhow;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, TableId};
30use risingwave_common::id::JobId;
31use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
32use risingwave_common::util::epoch::{Epoch, EpochPair};
33use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
34use risingwave_pb::common::WorkerNode;
35use risingwave_pb::ddl_service::PbBackfillType;
36use risingwave_pb::hummock::HummockVersionStats;
37use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
38use risingwave_pb::stream_plan::barrier::PbBarrierKind;
39use risingwave_pb::stream_plan::barrier_mutation::Mutation;
40use risingwave_pb::stream_plan::{AddMutation, StartFragmentBackfillMutation, StopMutation};
41use risingwave_pb::stream_service::BarrierCompleteResponse;
42use tracing::{debug, info};
43
44use crate::MetaResult;
45use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
46use crate::barrier::command::PostCollectCommand;
47use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
48use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
49use crate::barrier::info::BarrierInfo;
50use crate::barrier::notifier::Notifier;
51use crate::barrier::partial_graph::{
52    CollectedBarrier, PartialGraphBarrierInfo, PartialGraphManager, PartialGraphStat,
53};
54use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
55use crate::barrier::rpc::to_partial_graph_id;
56use crate::barrier::{
57    BackfillOrderState, BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch,
58};
59use crate::controller::fragment::InflightFragmentInfo;
60use crate::controller::scale::{
61    ComponentFragmentAligner, EnsembleActorTemplate, LoadedFragment, NoShuffleEnsemble,
62    build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
63};
64use crate::model::{
65    FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate, StreamingJobModelContextExt,
66};
67use crate::rpc::metrics::GLOBAL_META_METRICS;
68use crate::stream::ExtendedFragmentBackfillOrder;
69
70// ── Public types ──────────────────────────────────────────────────────────────
71
72/// Logical fragment metadata for a batch refresh job.
73///
74/// Contains only catalog-level information: fragment structure, stream plan nodes,
75/// distribution, and downstream relations. No actor IDs, no worker placement.
76///
77/// Used as the uniform input for `render_actors_and_build_job_info()`, which performs
78/// actor rendering (ID allocation, worker placement, vnode assignment) internally.
79/// No-shuffle ensembles are derived from `downstreams` internally.
80#[derive(Debug)]
81pub(crate) struct BatchRefreshLogicalFragments {
82    /// Logical fragments of this job. Keyed by `fragment_id`.
83    pub fragments: HashMap<FragmentId, LoadedFragment>,
84    /// Internal downstream relations (intra-job only; no upstream edges).
85    pub downstreams: FragmentDownstreamRelation,
86}
87
88/// Result of the unified actor rendering for a batch refresh job.
89///
90/// Produced by `render_actors_and_build_job_info()` and consumed by both
91/// `new()` (create) and `recover()`.
92#[derive(Debug)]
93pub(crate) struct BatchRefreshRenderResult {
94    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
95    pub node_actors: HashMap<WorkerId, HashSet<ActorId>>,
96    pub state_table_ids: HashSet<TableId>,
97    pub actors_to_create: StreamJobActorsToCreate,
98}
99
100// ── Status ────────────────────────────────────────────────────────────────────
101
102#[derive(Debug)]
103enum BatchRefreshJobStatus {
104    /// The job is consuming upstream snapshot.
105    ///
106    /// Once snapshot consumption finishes, the final checkpoint + stop barriers are injected
107    /// and the status transitions to `FinishingSnapshot`.
108    ConsumingSnapshot {
109        prev_epoch_fake_physical_time: u64,
110        version_stats: HummockVersionStats,
111        create_mview_tracker: CreateMviewProgressTracker,
112        snapshot_epoch: u64,
113        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
114        pending_non_checkpoint_barriers: Vec<u64>,
115        node_actors: HashMap<WorkerId, HashSet<ActorId>>,
116        state_table_ids: HashSet<TableId>,
117    },
118    /// The job has finished consuming the snapshot.
119    ///
120    /// The final checkpoint barrier (at `snapshot_epoch`) and the stop barrier have been
121    /// injected. Once the stop epoch is committed the job transitions to `Idle`.
122    /// The committed epoch is expected to be the snapshot epoch when the snapshot
123    /// consumption finishes.
124    FinishingSnapshot {
125        tracking_job: Option<TrackingJob>,
126        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
127    },
128    /// The job is idle, waiting for the next trigger. No partial graph is held.
129    Idle,
130    /// The partial graph is being reset (only for drop).
131    Resetting { notifiers: Vec<Notifier> },
132}
133
134// ── Complete type ─────────────────────────────────────────────────────────────
135
136// ── Main checkpoint control ───────────────────────────────────────────────────
137
138/// Self-contained checkpoint control for a batch refresh MV.
139///
140/// Unlike `CreatingStreamingJobControl`, this struct handles the full lifecycle
141/// (snapshot → idle → re-run → idle → ...). Both types are stored together in
142/// `DatabaseCheckpointControl.independent_checkpoint_job_controls` as
143/// `IndependentCheckpointJobControl` variants.
144#[derive(Debug)]
145pub(crate) struct BatchRefreshJobCheckpointControl {
146    job_id: JobId,
147    partial_graph_id: PartialGraphId,
148    snapshot_backfill_upstream_tables: HashSet<TableId>,
149    snapshot_epoch: u64,
150
151    max_committed_epoch: Option<u64>,
152    status: BatchRefreshJobStatus,
153}
154
155// ── Unified actor rendering ───────────────────────────────────────────────────
156
157impl BatchRefreshJobCheckpointControl {
158    /// Render actors for a batch refresh job from logical metadata only.
159    ///
160    /// Performs the full pipeline:
161    /// 1. Derive no-shuffle ensembles from `downstreams`
162    /// 2. Render actor assignments (ID allocation, worker placement, vnode bitmap)
163    /// 3. Build `StreamActor` structs
164    /// 4. Build internal-only edges (no upstream dispatcher edges)
165    /// 5. Produce `fragment_infos`, `node_actors`, `state_table_ids`, `actors_to_create`
166    ///
167    /// Shared by both the DDL create path and the recovery path.
168    pub(crate) fn render_actors_and_build_job_info(
169        fragments: &HashMap<FragmentId, LoadedFragment>,
170        downstreams: &FragmentDownstreamRelation,
171        definition: &str,
172        // Actor rendering context:
173        actor_id_generator: &AtomicU32,
174        worker_nodes: &HashMap<WorkerId, WorkerNode>,
175        database_resource_group: &str,
176        streaming_job_model: &streaming_job::Model,
177        // Edge building context:
178        partial_graph_id: PartialGraphId,
179    ) -> MetaResult<BatchRefreshRenderResult> {
180        // Step 1: Derive no-shuffle ensembles from downstreams.
181        let ensembles = Self::resolve_ensembles(fragments, downstreams)?;
182
183        // Step 2: Render actor assignments for each ensemble.
184        let mut actor_assignments: HashMap<
185            FragmentId,
186            HashMap<ActorId, (WorkerId, Option<risingwave_common::bitmap::Bitmap>)>,
187        > = HashMap::new();
188
189        for ensemble in &ensembles {
190            // All fragments are new (batch refresh has no existing upstream fragments).
191            let first_component = ensemble
192                .component_fragments()
193                .next()
194                .expect("ensemble must have at least one component");
195            let fragment = &fragments[&first_component];
196            let distribution_type = fragment.distribution_type;
197            let vnode_count = fragment.vnode_count;
198
199            // Assert all component fragments share the same vnode count.
200            for fid in ensemble.component_fragments() {
201                let f = &fragments[&fid];
202                assert_eq!(
203                    vnode_count, f.vnode_count,
204                    "fragments {} and {} in same ensemble have different vnode counts",
205                    first_component, fid,
206                );
207            }
208
209            let entry_fragment_parallelism = ensemble
210                .entry_fragments()
211                .map(|fid| fragments[&fid].parallelism.clone())
212                .dedup()
213                .exactly_one()
214                .map_err(|_| {
215                    anyhow!(
216                        "entry fragments have inconsistent parallelism settings in batch refresh job"
217                    )
218                })?;
219
220            let actor_template = EnsembleActorTemplate::render_new(
221                streaming_job_model,
222                worker_nodes,
223                entry_fragment_parallelism,
224                database_resource_group.to_owned(),
225                distribution_type,
226                vnode_count,
227            )?;
228
229            for fid in ensemble.component_fragments() {
230                let f = &fragments[&fid];
231                let aligner =
232                    ComponentFragmentAligner::new_persistent(&actor_template, actor_id_generator);
233                let assignments = aligner.align_component_actor(f.distribution_type);
234                actor_assignments.insert(fid, assignments);
235            }
236        }
237
238        // Step 3: Expand assignments into StreamActor + actor_location + InflightFragmentInfo.
239        let mut stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
240        let mut actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
241
242        for (fragment_id, assignments) in &actor_assignments {
243            let mut actors = Vec::with_capacity(assignments.len());
244            for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
245                actor_location.insert(actor_id, *worker_id);
246                let stream_context = streaming_job_model.stream_context();
247                actors.push(StreamActor {
248                    actor_id,
249                    fragment_id: *fragment_id,
250                    vnode_bitmap: vnode_bitmap.clone(),
251                    mview_definition: definition.to_owned(),
252                    expr_context: Some(stream_context.to_expr_context()),
253                    config_override: stream_context.config_override.clone(),
254                });
255            }
256            stream_actors.insert(*fragment_id, actors);
257        }
258
259        // Build InflightFragmentInfo from logical fragments + rendered actors.
260        let fragment_infos: HashMap<FragmentId, InflightFragmentInfo> = fragments
261            .iter()
262            .map(|(fragment_id, loaded)| {
263                let actors = stream_actors
264                    .get(fragment_id)
265                    .into_iter()
266                    .flatten()
267                    .map(|actor| {
268                        (
269                            actor.actor_id,
270                            crate::controller::fragment::InflightActorInfo {
271                                worker_id: actor_location[&actor.actor_id],
272                                vnode_bitmap: actor.vnode_bitmap.clone(),
273                                splits: vec![], // batch refresh has no source splits
274                            },
275                        )
276                    })
277                    .collect();
278                (
279                    *fragment_id,
280                    InflightFragmentInfo {
281                        fragment_id: *fragment_id,
282                        distribution_type: loaded.distribution_type,
283                        fragment_type_mask: loaded.fragment_type_mask,
284                        vnode_count: loaded.vnode_count,
285                        nodes: loaded.nodes.clone(),
286                        actors,
287                        state_table_ids: loaded.state_table_ids.clone(),
288                    },
289                )
290            })
291            .collect();
292
293        // Step 4: Build edges (internal-only, no upstream).
294        let mut builder = FragmentEdgeBuilder::new(fragment_infos.values().map(|f| {
295            (
296                f.fragment_id,
297                EdgeBuilderFragmentInfo::from_inflight_with_worker_nodes(
298                    f,
299                    partial_graph_id,
300                    worker_nodes,
301                ),
302            )
303        }));
304        builder.add_relations(downstreams);
305        let mut edges = builder.build();
306
307        let actors_to_create = edges.collect_actors_to_create(fragment_infos.values().map(|f| {
308            (
309                f.fragment_id,
310                &f.nodes,
311                f.actors.iter().map(|(actor_id, actor)| {
312                    let sa = stream_actors[&f.fragment_id]
313                        .iter()
314                        .find(|a| a.actor_id == *actor_id)
315                        .expect("should exist");
316                    (sa, actor.worker_id)
317                }),
318                vec![], // no subscribers for batch refresh jobs
319            )
320        }));
321
322        // Step 5: Build node_actors, state_table_ids.
323        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
324        let state_table_ids =
325            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
326
327        Ok(BatchRefreshRenderResult {
328            fragment_infos,
329            node_actors,
330            state_table_ids,
331            actors_to_create,
332        })
333    }
334
335    /// Build the initial `Add` mutation for the partial graph's first barrier.
336    ///
337    /// The rendered actors come from a prior `render_actors_and_build_job_info()` call;
338    /// `backfill_nodes_to_pause` is derived from the job's backfill ordering.
339    pub(crate) fn build_initial_partial_graph_mutation(
340        render_result: &BatchRefreshRenderResult,
341        backfill_ordering: &ExtendedFragmentBackfillOrder,
342    ) -> Mutation {
343        let added_actors: Vec<ActorId> = render_result
344            .fragment_infos
345            .values()
346            .flat_map(|f| f.actors.keys().copied())
347            .collect();
348        let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_ordering)
349            .into_iter()
350            .collect();
351        Mutation::Add(AddMutation {
352            actor_dispatchers: Default::default(),
353            added_actors,
354            actor_splits: Default::default(),
355            pause: false,
356            subscriptions_to_add: Default::default(),
357            backfill_nodes_to_pause,
358            actor_cdc_table_snapshot_splits: None,
359            new_upstream_sinks: Default::default(),
360        })
361    }
362
363    /// Derive no-shuffle ensembles from fragment downstreams.
364    fn resolve_ensembles(
365        fragments: &HashMap<FragmentId, LoadedFragment>,
366        downstreams: &FragmentDownstreamRelation,
367    ) -> MetaResult<Vec<NoShuffleEnsemble>> {
368        let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
369        for (upstream_fid, relations) in downstreams {
370            for rel in relations {
371                if rel.dispatcher_type == DispatcherType::NoShuffle {
372                    new_no_shuffle
373                        .entry(*upstream_fid)
374                        .or_default()
375                        .insert(rel.downstream_fragment_id);
376                }
377            }
378        }
379
380        let mut ensembles = if new_no_shuffle.is_empty() {
381            Vec::new()
382        } else {
383            let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
384                .iter()
385                .flat_map(|(u, ds)| ds.iter().map(move |d| (*u, *d)))
386                .collect();
387            let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
388                .iter()
389                .flat_map(|(u, d)| [*u, *d])
390                .collect::<HashSet<_>>()
391                .into_iter()
392                .collect();
393            let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
394            find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
395        };
396
397        // Add standalone fragments as single-fragment ensembles.
398        let covered: HashSet<FragmentId> = ensembles
399            .iter()
400            .flat_map(|e| e.component_fragments())
401            .collect();
402        for fragment_id in fragments.keys() {
403            if !covered.contains(fragment_id) {
404                ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
405            }
406        }
407
408        Ok(ensembles)
409    }
410}
411
412// ── Construction ──────────────────────────────────────────────────────────────
413
414impl BatchRefreshJobCheckpointControl {
415    /// Create from DDL command. Starts in `ConsumingSnapshot`.
416    ///
417    /// Internally calls `render_actors_and_build_job_info()` and injects the
418    /// partial-graph initial barrier.
419    pub(crate) fn new(
420        database_id: DatabaseId,
421        job_id: JobId,
422        create_info: CreateSnapshotBackfillJobCommandInfo,
423        notifiers: Vec<Notifier>,
424        snapshot_backfill_upstream_tables: HashSet<TableId>,
425        snapshot_epoch: u64,
426        version_stat: &HummockVersionStats,
427        partial_graph_manager: &mut PartialGraphManager,
428        logical: &BatchRefreshLogicalFragments,
429        worker_nodes: &HashMap<WorkerId, WorkerNode>,
430    ) -> MetaResult<Self> {
431        debug!(
432            %job_id,
433            "new batch refresh job"
434        );
435
436        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
437        let backfill_ordering = &create_info.info.fragment_backfill_ordering;
438        let actor_id_generator = partial_graph_manager
439            .control_stream_manager()
440            .env
441            .actor_id_generator();
442
443        let render_result = Self::render_actors_and_build_job_info(
444            &logical.fragments,
445            &logical.downstreams,
446            &create_info.info.definition,
447            actor_id_generator,
448            worker_nodes,
449            &create_info.info.database_resource_group,
450            &create_info.info.streaming_job_model,
451            partial_graph_id,
452        )?;
453        let initial_partial_graph_mutation =
454            Self::build_initial_partial_graph_mutation(&render_result, backfill_ordering);
455
456        let backfill_order_state = BackfillOrderState::new(
457            backfill_ordering,
458            &render_result.fragment_infos,
459            create_info
460                .info
461                .locality_fragment_state_table_mapping
462                .clone(),
463        );
464        let create_mview_tracker = CreateMviewProgressTracker::recover(
465            job_id,
466            &render_result.fragment_infos,
467            backfill_order_state,
468            version_stat,
469        );
470
471        let mut prev_epoch_fake_physical_time = 0;
472        let mut pending_non_checkpoint_barriers = vec![];
473
474        let initial_barrier_info = super::new_fake_barrier(
475            &mut prev_epoch_fake_physical_time,
476            &mut pending_non_checkpoint_barriers,
477            PbBarrierKind::Checkpoint,
478        );
479
480        let mut graph_adder = partial_graph_manager.add_partial_graph(
481            partial_graph_id,
482            BatchRefreshBarrierStats::new(job_id, snapshot_epoch),
483        );
484
485        if let Err(e) = Self::inject_barrier(
486            partial_graph_id,
487            graph_adder.manager(),
488            &render_result.node_actors,
489            &render_result.state_table_ids,
490            initial_barrier_info,
491            Some(render_result.actors_to_create),
492            Some(initial_partial_graph_mutation),
493            notifiers,
494            Some(create_info),
495            false,
496        ) {
497            graph_adder.failed();
498            return Err(e);
499        }
500
501        graph_adder.added();
502        assert!(pending_non_checkpoint_barriers.is_empty());
503        let this = Self {
504            partial_graph_id,
505            job_id,
506            snapshot_backfill_upstream_tables,
507            snapshot_epoch,
508            max_committed_epoch: None,
509            status: BatchRefreshJobStatus::ConsumingSnapshot {
510                prev_epoch_fake_physical_time,
511                version_stats: version_stat.clone(),
512                create_mview_tracker,
513                snapshot_epoch,
514                fragment_infos: render_result.fragment_infos,
515                pending_non_checkpoint_barriers,
516                node_actors: render_result.node_actors,
517                state_table_ids: render_result.state_table_ids,
518            },
519        };
520        Ok(this)
521    }
522
523    /// Recover from a persistent state during recovery.
524    ///
525    /// - If `committed_epoch >= snapshot_epoch` → Idle (snapshot completed before crash).
526    /// - If `committed_epoch < snapshot_epoch` → `ConsumingSnapshot` using pre-rendered actors.
527    pub(crate) fn recover(
528        database_id: DatabaseId,
529        job_id: JobId,
530        snapshot_backfill_upstream_tables: HashSet<TableId>,
531        snapshot_epoch: u64,
532        committed_epoch: u64,
533        backfill_order: ExtendedFragmentBackfillOrder,
534        version_stat: &HummockVersionStats,
535        initial_mutation: Mutation,
536        render_result: BatchRefreshRenderResult,
537        partial_graph_recoverer: &mut crate::barrier::partial_graph::PartialGraphRecoverer<'_>,
538    ) -> MetaResult<Self> {
539        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
540
541        if committed_epoch >= snapshot_epoch {
542            // Snapshot completed; recover to Idle.
543            info!(
544                %job_id,
545                committed_epoch,
546                snapshot_epoch,
547                "recovered idle batch refresh job (no partial graph)"
548            );
549            return Ok(Self {
550                job_id,
551                partial_graph_id,
552                snapshot_backfill_upstream_tables,
553                snapshot_epoch,
554                max_committed_epoch: Some(committed_epoch),
555                status: BatchRefreshJobStatus::Idle,
556            });
557        }
558
559        // Snapshot still in-progress; recover to ConsumingSnapshot.
560        info!(
561            %job_id,
562            committed_epoch,
563            snapshot_epoch,
564            "recovered batch refresh job to consuming snapshot"
565        );
566
567        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
568        let mut pending_non_checkpoint_barriers = vec![];
569
570        let locality_fragment_state_table_mapping =
571            crate::barrier::rpc::build_locality_fragment_state_table_mapping(
572                &render_result.fragment_infos,
573            );
574        let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
575            &backfill_order,
576            &render_result.fragment_infos,
577            locality_fragment_state_table_mapping,
578        );
579
580        let create_mview_tracker = CreateMviewProgressTracker::recover(
581            job_id,
582            &render_result.fragment_infos,
583            backfill_order_state,
584            version_stat,
585        );
586
587        let first_barrier_info = super::new_fake_barrier(
588            &mut prev_epoch_fake_physical_time,
589            &mut pending_non_checkpoint_barriers,
590            PbBarrierKind::Initial,
591        );
592
593        partial_graph_recoverer.recover_graph(
594            partial_graph_id,
595            initial_mutation,
596            &first_barrier_info,
597            &render_result.node_actors,
598            render_result.state_table_ids.iter().copied(),
599            render_result.actors_to_create,
600            BatchRefreshBarrierStats::new(job_id, snapshot_epoch),
601        )?;
602
603        Ok(Self {
604            job_id,
605            partial_graph_id,
606            snapshot_backfill_upstream_tables,
607            snapshot_epoch,
608            max_committed_epoch: Some(committed_epoch),
609            status: BatchRefreshJobStatus::ConsumingSnapshot {
610                prev_epoch_fake_physical_time,
611                version_stats: version_stat.clone(),
612                create_mview_tracker,
613                fragment_infos: render_result.fragment_infos,
614                snapshot_epoch,
615                pending_non_checkpoint_barriers,
616                node_actors: render_result.node_actors,
617                state_table_ids: render_result.state_table_ids,
618            },
619        })
620    }
621}
622
623// ── Barrier injection ─────────────────────────────────────────────────────────
624
625impl BatchRefreshJobCheckpointControl {
626    fn inject_barrier(
627        partial_graph_id: PartialGraphId,
628        partial_graph_manager: &mut PartialGraphManager,
629        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
630        state_table_ids: &HashSet<TableId>,
631        barrier_info: BarrierInfo,
632        new_actors: Option<StreamJobActorsToCreate>,
633        mutation: Option<Mutation>,
634        notifiers: Vec<Notifier>,
635        first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
636        is_stop: bool,
637    ) -> MetaResult<()> {
638        if is_stop {
639            assert!(
640                matches!(&mutation, Some(Mutation::Stop(_))),
641                "stop barrier must carry a Stop mutation"
642            );
643        }
644        partial_graph_manager.inject_barrier(
645            partial_graph_id,
646            mutation,
647            node_actors,
648            state_table_ids.iter().copied(),
649            if is_stop {
650                // Stop barrier: data already synced by the prior checkpoint.
651                itertools::Either::Left(std::iter::empty())
652            } else {
653                itertools::Either::Right(node_actors.keys().copied())
654            },
655            new_actors,
656            PartialGraphBarrierInfo::new(
657                first_create_info.map_or_else(
658                    PostCollectCommand::barrier,
659                    CreateSnapshotBackfillJobCommandInfo::into_post_collect,
660                ),
661                barrier_info,
662                notifiers,
663                state_table_ids.clone(),
664            ),
665        )?;
666        Ok(())
667    }
668}
669
670// ── Barrier forwarding and collection ─────────────────────────────────────────
671
672impl BatchRefreshJobCheckpointControl {
673    pub(crate) fn on_new_upstream_barrier(
674        &mut self,
675        partial_graph_manager: &mut PartialGraphManager,
676        barrier_info: &BarrierInfo,
677        mutation: Option<(Mutation, Vec<Notifier>)>,
678    ) -> MetaResult<()> {
679        if !matches!(self.status, BatchRefreshJobStatus::ConsumingSnapshot { .. }) {
680            return Ok(());
681        }
682        let (mut mutation, mut notifiers) = match mutation {
683            Some((mutation, notifiers)) => (Some(mutation), notifiers),
684            None => (None, vec![]),
685        };
686
687        // Check if snapshot consumption is finished and we need to inject stop barriers.
688        let is_finished = matches!(
689            &self.status,
690            BatchRefreshJobStatus::ConsumingSnapshot { create_mview_tracker, .. }
691            if create_mview_tracker.is_finished()
692        );
693
694        if is_finished {
695            // Discard the upstream mutation — not needed for stop barriers.
696            mutation.take();
697
698            // Take the status out to destructure and transition to `FinishingSnapshot`.
699            let old_status = replace(&mut self.status, BatchRefreshJobStatus::Idle);
700            let BatchRefreshJobStatus::ConsumingSnapshot {
701                prev_epoch_fake_physical_time,
702                mut pending_non_checkpoint_barriers,
703                snapshot_epoch,
704                fragment_infos,
705                create_mview_tracker,
706                node_actors,
707                state_table_ids,
708                ..
709            } = old_status
710            else {
711                unreachable!()
712            };
713
714            let tracking_job = create_mview_tracker.into_tracking_job();
715
716            // Inject final checkpoint at snapshot epoch.
717            pending_non_checkpoint_barriers.push(snapshot_epoch);
718            let prev_epoch = Epoch::from_physical_time(prev_epoch_fake_physical_time);
719            let final_checkpoint = BarrierInfo {
720                curr_epoch: TracedEpoch::new(Epoch(snapshot_epoch)),
721                prev_epoch: TracedEpoch::new(prev_epoch),
722                kind: BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers)),
723            };
724
725            // Inject stop barrier with u64::MAX as curr_epoch and empty nodes_to_sync_table.
726            let stop_barrier = BarrierInfo {
727                prev_epoch: TracedEpoch::new(Epoch(snapshot_epoch)),
728                curr_epoch: TracedEpoch::new(Epoch(u64::MAX)),
729                kind: BarrierKind::Checkpoint(vec![snapshot_epoch]),
730            };
731
732            let stop_actors: Vec<ActorId> = fragment_infos
733                .values()
734                .flat_map(|f| f.actors.keys().copied())
735                .collect();
736
737            Self::inject_barrier(
738                self.partial_graph_id,
739                partial_graph_manager,
740                &node_actors,
741                &state_table_ids,
742                final_checkpoint,
743                None,
744                None,
745                take(&mut notifiers),
746                None,
747                false,
748            )?;
749            Self::inject_barrier(
750                self.partial_graph_id,
751                partial_graph_manager,
752                &node_actors,
753                &state_table_ids,
754                stop_barrier,
755                None,
756                Some(Mutation::Stop(StopMutation {
757                    actors: stop_actors,
758                    dropped_sink_fragments: vec![],
759                })),
760                vec![],
761                None,
762                true,
763            )?;
764
765            self.status = BatchRefreshJobStatus::FinishingSnapshot {
766                tracking_job: Some(tracking_job),
767                fragment_infos,
768            };
769        } else {
770            // Normal barrier — still consuming snapshot.
771            let BatchRefreshJobStatus::ConsumingSnapshot {
772                prev_epoch_fake_physical_time,
773                pending_non_checkpoint_barriers,
774                create_mview_tracker,
775                node_actors,
776                state_table_ids,
777                ..
778            } = &mut self.status
779            else {
780                unreachable!("is_finished was false, status must be ConsumingSnapshot")
781            };
782
783            // Forward a fake barrier to the partial graph.
784            let mutation = mutation.take().or_else(|| {
785                let pending_backfill_nodes = create_mview_tracker
786                    .take_pending_backfill_nodes()
787                    .collect_vec();
788                if pending_backfill_nodes.is_empty() {
789                    None
790                } else {
791                    Some(Mutation::StartFragmentBackfill(
792                        StartFragmentBackfillMutation {
793                            fragment_ids: pending_backfill_nodes,
794                        },
795                    ))
796                }
797            });
798            let barrier_to_inject = super::new_fake_barrier(
799                prev_epoch_fake_physical_time,
800                pending_non_checkpoint_barriers,
801                match barrier_info.kind {
802                    BarrierKind::Barrier => PbBarrierKind::Barrier,
803                    BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
804                    BarrierKind::Initial => {
805                        unreachable!("upstream new epoch should not be initial")
806                    }
807                },
808            );
809            Self::inject_barrier(
810                self.partial_graph_id,
811                partial_graph_manager,
812                node_actors,
813                state_table_ids,
814                barrier_to_inject,
815                None,
816                mutation,
817                take(&mut notifiers),
818                None,
819                false,
820            )?;
821        }
822        assert!(mutation.is_none(), "must have consumed mutation");
823        assert!(notifiers.is_empty(), "must consumed notifiers");
824        Ok(())
825    }
826
827    pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
828        match &mut self.status {
829            BatchRefreshJobStatus::ConsumingSnapshot {
830                create_mview_tracker,
831                version_stats,
832                ..
833            } => {
834                for progress in collected_barrier
835                    .resps
836                    .values()
837                    .flat_map(|resp| &resp.create_mview_progress)
838                {
839                    create_mview_tracker.apply_progress(progress, version_stats);
840                }
841                create_mview_tracker.is_finished()
842            }
843            _ => false,
844        }
845    }
846}
847
848// ── Completing ────────────────────────────────────────────────────────────────
849
850impl BatchRefreshJobCheckpointControl {
851    #[expect(clippy::type_complexity)]
852    pub(crate) fn start_completing(
853        &mut self,
854        partial_graph_manager: &mut PartialGraphManager,
855    ) -> Option<(
856        u64,
857        HashMap<WorkerId, BarrierCompleteResponse>,
858        PartialGraphBarrierInfo,
859        Option<TrackingJob>,
860    )> {
861        match &self.status {
862            BatchRefreshJobStatus::ConsumingSnapshot { .. }
863            | BatchRefreshJobStatus::FinishingSnapshot { .. } => {}
864            BatchRefreshJobStatus::Idle | BatchRefreshJobStatus::Resetting { .. } => {
865                return None;
866            }
867        };
868
869        partial_graph_manager
870            .start_completing(
871                self.partial_graph_id,
872                std::ops::Bound::Unbounded,
873                |_non_checkpoint_epoch, _resps, _| {
874                    // Progress already applied in `collect()`.
875                },
876            )
877            .map(|(epoch, resps, info)| {
878                // Take tracking job only when completing the snapshot epoch
879                // (the stop barrier). By this point the final checkpoint has
880                // already been committed in a prior task.
881                let tracking_job = if epoch == self.snapshot_epoch {
882                    match &mut self.status {
883                        BatchRefreshJobStatus::FinishingSnapshot { tracking_job, .. } => Some(
884                            tracking_job
885                                .take()
886                                .expect("tracking job should not have been taken yet"),
887                        ),
888                        _ => panic!(
889                            "batch refresh job {}: expected FinishingSnapshot at snapshot epoch",
890                            self.job_id
891                        ),
892                    }
893                } else {
894                    None
895                };
896                (epoch, resps, info, tracking_job)
897            })
898    }
899
900    pub(super) fn ack_completed(
901        &mut self,
902        partial_graph_manager: &mut PartialGraphManager,
903        completed_epoch: u64,
904    ) {
905        partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
906        if let Some(prev_max_committed_epoch) = self.max_committed_epoch.replace(completed_epoch) {
907            assert!(completed_epoch > prev_max_committed_epoch);
908        }
909
910        if completed_epoch == self.snapshot_epoch {
911            // The stop barrier (prev_epoch = snapshot_epoch) has been committed.
912            // Assert expected state and transition to idle.
913            match &self.status {
914                BatchRefreshJobStatus::FinishingSnapshot { tracking_job, .. } => {
915                    assert!(
916                        tracking_job.is_none(),
917                        "tracking job should have been taken at start_completing"
918                    );
919                }
920                _ => panic!(
921                    "batch refresh job {}: expected FinishingSnapshot when completing snapshot epoch",
922                    self.job_id
923                ),
924            }
925            info!(
926                job_id = %self.job_id,
927                completed_epoch,
928                "batch refresh job: transitioned to idle, removing partial graph"
929            );
930            partial_graph_manager.remove_partial_graphs(vec![self.partial_graph_id]);
931            self.status = BatchRefreshJobStatus::Idle;
932        }
933    }
934
935    /// Called when the partial graph reset is confirmed (drop only).
936    pub(super) fn on_partial_graph_reset(mut self) {
937        match &mut self.status {
938            BatchRefreshJobStatus::Resetting { notifiers } => {
939                for notifier in notifiers.drain(..) {
940                    notifier.notify_collected();
941                }
942            }
943            _ => {
944                panic!(
945                    "batch refresh job {}: on_partial_graph_reset in unexpected state {:?}",
946                    self.job_id, self.status
947                );
948            }
949        }
950    }
951}
952
953// ── Query methods ─────────────────────────────────────────────────────────────
954
955impl BatchRefreshJobCheckpointControl {
956    pub(crate) fn gen_backfill_progress(&self) -> Option<BackfillProgress> {
957        match &self.status {
958            BatchRefreshJobStatus::ConsumingSnapshot {
959                create_mview_tracker,
960                ..
961            } => {
962                let progress = if create_mview_tracker.is_finished() {
963                    "Snapshot finished".to_owned()
964                } else {
965                    let progress = create_mview_tracker.gen_backfill_progress();
966                    format!("BatchRefresh Snapshot [{}]", progress)
967                };
968                Some(BackfillProgress {
969                    progress,
970                    backfill_type: PbBackfillType::SnapshotBackfill,
971                })
972            }
973            BatchRefreshJobStatus::FinishingSnapshot { .. } => Some(BackfillProgress {
974                progress: "BatchRefresh Stopping".to_owned(),
975                backfill_type: PbBackfillType::SnapshotBackfill,
976            }),
977            BatchRefreshJobStatus::Idle | BatchRefreshJobStatus::Resetting { .. } => None,
978        }
979    }
980
981    pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
982        match &self.status {
983            BatchRefreshJobStatus::ConsumingSnapshot {
984                create_mview_tracker,
985                fragment_infos,
986                ..
987            } => create_mview_tracker.collect_fragment_progress(fragment_infos, true),
988            BatchRefreshJobStatus::FinishingSnapshot { fragment_infos, .. } => {
989                collect_done_fragments(self.job_id, fragment_infos)
990            }
991            _ => vec![],
992        }
993    }
994
995    /// Returns the pinned upstream log epoch and upstream table IDs.
996    pub(super) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
997        (
998            self.snapshot_epoch,
999            self.snapshot_backfill_upstream_tables.clone(),
1000        )
1001    }
1002
1003    pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
1004        match &self.status {
1005            BatchRefreshJobStatus::ConsumingSnapshot { fragment_infos, .. } => Some(fragment_infos),
1006            BatchRefreshJobStatus::FinishingSnapshot { .. }
1007            | BatchRefreshJobStatus::Idle
1008            | BatchRefreshJobStatus::Resetting { .. } => None,
1009        }
1010    }
1011
1012    pub(crate) fn is_snapshot_backfilling(&self) -> bool {
1013        matches!(
1014            self.status,
1015            BatchRefreshJobStatus::ConsumingSnapshot { .. }
1016                | BatchRefreshJobStatus::FinishingSnapshot { .. }
1017        )
1018    }
1019}
1020
1021// ── Drop handling ─────────────────────────────────────────────────────────────
1022
1023impl BatchRefreshJobCheckpointControl {
1024    /// Drop this batch refresh job.
1025    pub(super) fn drop(
1026        &mut self,
1027        notifiers: &mut Vec<Notifier>,
1028        partial_graph_manager: &mut PartialGraphManager,
1029    ) -> bool {
1030        match &mut self.status {
1031            BatchRefreshJobStatus::Resetting {
1032                notifiers: existing_notifiers,
1033            } => {
1034                for notifier in &mut *notifiers {
1035                    notifier.notify_started();
1036                }
1037                existing_notifiers.append(notifiers);
1038                true
1039            }
1040            BatchRefreshJobStatus::ConsumingSnapshot { .. }
1041            | BatchRefreshJobStatus::FinishingSnapshot { .. } => {
1042                for notifier in &mut *notifiers {
1043                    notifier.notify_started();
1044                }
1045                partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
1046                self.status = BatchRefreshJobStatus::Resetting {
1047                    notifiers: take(notifiers),
1048                };
1049                true
1050            }
1051            BatchRefreshJobStatus::Idle => {
1052                // Idle has no running partial graph, but we still go through
1053                // the reset flow so the cleanup path is uniform.
1054                for notifier in &mut *notifiers {
1055                    notifier.notify_started();
1056                }
1057                partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
1058                self.status = BatchRefreshJobStatus::Resetting {
1059                    notifiers: take(notifiers),
1060                };
1061                true
1062            }
1063        }
1064    }
1065
1066    /// Reset during database recovery.
1067    ///
1068    /// Returns `true` if the partial graph was already resetting (from a prior drop),
1069    /// meaning we should not issue a new reset request.
1070    pub(crate) fn reset(self) -> bool {
1071        match self.status {
1072            BatchRefreshJobStatus::ConsumingSnapshot { .. }
1073            | BatchRefreshJobStatus::FinishingSnapshot { .. }
1074            | BatchRefreshJobStatus::Idle => false,
1075            BatchRefreshJobStatus::Resetting { notifiers } => {
1076                for notifier in notifiers {
1077                    notifier.notify_collected();
1078                }
1079                true
1080            }
1081        }
1082    }
1083}
1084
1085// ── Barrier stats ─────────────────────────────────────────────────────────────
1086
1087struct BatchRefreshBarrierStats {
1088    barrier_latency: LabelGuardedHistogram,
1089    inflight_barrier_num: LabelGuardedIntGauge,
1090}
1091
1092impl BatchRefreshBarrierStats {
1093    fn new(job_id: JobId, _snapshot_epoch: u64) -> Self {
1094        let table_id_str = format!("{}", job_id);
1095        Self {
1096            barrier_latency: GLOBAL_META_METRICS
1097                .snapshot_backfill_barrier_latency
1098                .with_guarded_label_values(&[table_id_str.as_str(), "batch_refresh_snapshot"]),
1099            inflight_barrier_num: GLOBAL_META_METRICS
1100                .snapshot_backfill_inflight_barrier_num
1101                .with_guarded_label_values(&[&table_id_str]),
1102        }
1103    }
1104}
1105
1106impl PartialGraphStat for BatchRefreshBarrierStats {
1107    fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
1108        self.barrier_latency.observe(barrier_latency_secs);
1109    }
1110
1111    fn observe_barrier_num(&self, inflight_barrier_num: usize, _collected_barrier_num: usize) {
1112        self.inflight_barrier_num.set(inflight_barrier_num as _);
1113    }
1114}