Skip to main content

risingwave_meta/barrier/checkpoint/independent_job/batch_refresh_job/
mod.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batch refresh job checkpoint control for periodically-refreshed materialized views.
16//!
17//! It lives permanently in `DatabaseCheckpointControl.independent_checkpoint_job_controls`
18//! as an `IndependentCheckpointJobControl::BatchRefresh` variant for its entire lifetime.
19//!
20//! Lifecycle:
21//!   DDL → `ConsumingSnapshot` → `FinishingSnapshot` → `Idle`
22//!                                                        ↕  (periodic trigger)
23//!                                        `Idle` ← `ConsumingLogStore`
24
25use std::collections::{HashMap, HashSet};
26use std::mem::{replace, take};
27use std::sync::atomic::AtomicU32;
28
29use anyhow::anyhow;
30use itertools::Itertools;
31use risingwave_common::catalog::{DatabaseId, TableId};
32use risingwave_common::id::JobId;
33use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
34use risingwave_common::util::epoch::{Epoch, EpochPair};
35use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
36use risingwave_pb::common::WorkerNode;
37use risingwave_pb::ddl_service::PbBackfillType;
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
40use risingwave_pb::stream_plan::barrier::PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::{AddMutation, StartFragmentBackfillMutation, StopMutation};
43use risingwave_pb::stream_service::BarrierCompleteResponse;
44use tracing::{debug, info};
45
46use crate::MetaResult;
47use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
48use crate::barrier::command::PostCollectCommand;
49use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
50use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
51use crate::barrier::info::BarrierInfo;
52use crate::barrier::notifier::Notifier;
53use crate::barrier::partial_graph::{
54    CollectedBarrier, PartialGraphBarrierInfo, PartialGraphManager, PartialGraphStat,
55};
56use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
57use crate::barrier::rpc::to_partial_graph_id;
58use crate::barrier::{
59    BackfillOrderState, BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch,
60};
61use crate::controller::fragment::InflightFragmentInfo;
62use crate::controller::scale::{
63    ComponentFragmentAligner, EnsembleActorTemplate, LoadedFragment, NoShuffleEnsemble,
64    build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
65};
66use crate::model::{
67    FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate, StreamingJobModelContextExt,
68};
69use crate::rpc::metrics::GLOBAL_META_METRICS;
70use crate::stream::ExtendedFragmentBackfillOrder;
71
72// ── Public types ──────────────────────────────────────────────────────────────
73
74/// Logical fragment metadata for a batch refresh job.
75///
76/// Contains only catalog-level information: fragment structure, stream plan nodes,
77/// distribution, and downstream relations. No actor IDs, no worker placement.
78///
79/// Used as the uniform input for `render_actors_and_build_job_info()`, which performs
80/// actor rendering (ID allocation, worker placement, vnode assignment) internally.
81/// No-shuffle ensembles are derived from `downstreams` internally.
82#[derive(Debug)]
83pub(crate) struct BatchRefreshLogicalFragments {
84    /// Logical fragments of this job. Keyed by `fragment_id`.
85    pub fragments: HashMap<FragmentId, LoadedFragment>,
86    /// Internal downstream relations (intra-job only; no upstream edges).
87    pub downstreams: FragmentDownstreamRelation,
88}
89
90/// Result of the unified actor rendering for a batch refresh job.
91///
92/// Produced by `render_actors_and_build_job_info()` and consumed by both
93/// `new()` (create) and `recover()`.
94#[derive(Debug)]
95pub(crate) struct BatchRefreshRenderResult {
96    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
97    pub node_actors: HashMap<WorkerId, HashSet<ActorId>>,
98    pub state_table_ids: HashSet<TableId>,
99    pub actors_to_create: StreamJobActorsToCreate,
100}
101
102// ── Batch refresh job metadata ───────────────────────────────────────────────
103
104/// Lightweight metadata for re-rendering actors on each periodic refresh run.
105///
106/// Loaded asynchronously on every trigger via `load_batch_refresh_trigger_context()`.
107/// Contains the pieces consumed by `from_context()` and `render_actors_and_build_job_info()`,
108/// as well as the resolved upstream log epochs and target epoch for this trigger.
109#[derive(Debug)]
110pub(crate) struct BatchRefreshJobTriggerContext {
111    pub fragments: HashMap<FragmentId, LoadedFragment>,
112    pub downstreams: FragmentDownstreamRelation,
113    pub streaming_job_model: streaming_job::Model,
114    pub definition: String,
115    pub database_resource_group: String,
116    /// Changelog entries per upstream table, used to derive log barriers.
117    pub upstream_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
118    /// The upstream committed epoch to catch up to.
119    pub target_upstream_epoch: u64,
120}
121
122// ── Status ────────────────────────────────────────────────────────────────────
123
124/// The partial graph is being reset (always for drop).
125/// Once the reset is confirmed, the job is removed from the map.
126
127#[derive(Debug)]
128enum BatchRefreshJobStatus {
129    /// The job is consuming upstream snapshot.
130    ///
131    /// Once snapshot consumption finishes, the final checkpoint + stop barriers are injected
132    /// and the status transitions to `FinishingSnapshot`.
133    ConsumingSnapshot {
134        prev_epoch_fake_physical_time: u64,
135        version_stats: HummockVersionStats,
136        create_mview_tracker: CreateMviewProgressTracker,
137        snapshot_epoch: u64,
138        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
139        pending_non_checkpoint_barriers: Vec<u64>,
140        node_actors: HashMap<WorkerId, HashSet<ActorId>>,
141        state_table_ids: HashSet<TableId>,
142    },
143    /// The job has finished consuming the snapshot.
144    ///
145    /// The final checkpoint barrier (at `snapshot_epoch`) and the stop barrier have been
146    /// injected. Once the stop epoch is committed the job transitions to `Idle`.
147    /// The committed epoch is expected to be the snapshot epoch when the snapshot
148    /// consumption finishes.
149    FinishingSnapshot {
150        tracking_job: Option<TrackingJob>,
151        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
152    },
153    /// The job is idle, waiting for the next trigger. No partial graph is held.
154    Idle { last_committed_epoch: u64 },
155    /// The job has created a partial graph for periodic refresh and is waiting for
156    /// the initial barrier to bootstrap the newly-created actors.
157    InitializingBatchRefresh {
158        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
159        node_actors: HashMap<WorkerId, HashSet<ActorId>>,
160        state_table_ids: HashSet<TableId>,
161        /// Log barriers to inject after the partial graph is initialized. The
162        /// last one is the checkpoint stop barrier with `curr_epoch = u64::MAX`.
163        pending_log_barriers: Vec<BarrierInfo>,
164        logstore_start_epoch: u64,
165        target_upstream_epoch: u64,
166    },
167    /// The job is consuming upstream log store changes (periodic refresh).
168    ///
169    /// All replay barriers have been pre-injected (last with `StopMutation` at
170    /// `curr_epoch = u64::MAX`). When `target_upstream_epoch` commits,
171    /// the partial graph is removed and the job transitions to `Idle`.
172    ConsumingLogStore {
173        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
174        /// The epoch from which log consumption started (for `pinned_upstream_log_epoch`).
175        logstore_start_epoch: u64,
176        /// `prev_epoch` of the stop barrier; becomes `last_committed_epoch` when transitioning to Idle.
177        target_upstream_epoch: u64,
178    },
179    /// The partial graph is being reset (for drop).
180    Resetting { notifiers: Vec<Notifier> },
181}
182
183// ── Complete type ─────────────────────────────────────────────────────────────
184
185// ── Main checkpoint control ───────────────────────────────────────────────────
186
187/// Self-contained checkpoint control for a batch refresh MV.
188///
189/// Unlike `CreatingStreamingJobControl`, this struct handles the full lifecycle
190/// (snapshot → idle → re-run → idle → ...). Both types are stored together in
191/// `DatabaseCheckpointControl.independent_checkpoint_job_controls` as
192/// `IndependentCheckpointJobControl` variants.
193#[derive(Debug)]
194pub(crate) struct BatchRefreshJobCheckpointControl {
195    job_id: JobId,
196    partial_graph_id: PartialGraphId,
197    snapshot_backfill_upstream_tables: HashSet<TableId>,
198    snapshot_epoch: u64,
199    /// Batch refresh interval in seconds. Used to determine when to trigger a refresh run.
200    batch_refresh_seconds: u64,
201
202    status: BatchRefreshJobStatus,
203}
204
205// ── Unified actor rendering ───────────────────────────────────────────────────
206
207impl BatchRefreshJobCheckpointControl {
208    /// Render actors for a batch refresh job from logical metadata only.
209    ///
210    /// Performs the full pipeline:
211    /// 1. Derive no-shuffle ensembles from `downstreams`
212    /// 2. Render actor assignments (ID allocation, worker placement, vnode bitmap)
213    /// 3. Build `StreamActor` structs
214    /// 4. Build internal-only edges (no upstream dispatcher edges)
215    /// 5. Produce `fragment_infos`, `node_actors`, `state_table_ids`, `actors_to_create`
216    ///
217    /// Shared by both the DDL create path and the recovery path.
218    pub(crate) fn render_actors_and_build_job_info(
219        fragments: &HashMap<FragmentId, LoadedFragment>,
220        downstreams: &FragmentDownstreamRelation,
221        definition: &str,
222        // Actor rendering context:
223        actor_id_generator: &AtomicU32,
224        worker_nodes: &HashMap<WorkerId, WorkerNode>,
225        database_resource_group: &str,
226        streaming_job_model: &streaming_job::Model,
227        // Edge building context:
228        partial_graph_id: PartialGraphId,
229    ) -> MetaResult<BatchRefreshRenderResult> {
230        // Step 1: Derive no-shuffle ensembles from downstreams.
231        let ensembles = Self::resolve_ensembles(fragments, downstreams)?;
232
233        // Step 2: Render actor assignments for each ensemble.
234        let mut actor_assignments: HashMap<
235            FragmentId,
236            HashMap<ActorId, (WorkerId, Option<risingwave_common::bitmap::Bitmap>)>,
237        > = HashMap::new();
238
239        for ensemble in &ensembles {
240            // All fragments are new (batch refresh has no existing upstream fragments).
241            let first_component = ensemble
242                .component_fragments()
243                .next()
244                .expect("ensemble must have at least one component");
245            let fragment = &fragments[&first_component];
246            let distribution_type = fragment.distribution_type;
247            let vnode_count = fragment.vnode_count;
248
249            // Assert all component fragments share the same vnode count.
250            for fid in ensemble.component_fragments() {
251                let f = &fragments[&fid];
252                assert_eq!(
253                    vnode_count, f.vnode_count,
254                    "fragments {} and {} in same ensemble have different vnode counts",
255                    first_component, fid,
256                );
257            }
258
259            let entry_fragment_parallelism = Itertools::exactly_one(
260                ensemble
261                    .entry_fragments()
262                    .map(|fid| fragments[&fid].parallelism.clone())
263                    .dedup(),
264            )
265            .map_err(|_| {
266                anyhow!(
267                    "entry fragments have inconsistent parallelism settings in batch refresh job"
268                )
269            })?;
270
271            let actor_template = EnsembleActorTemplate::render_new(
272                streaming_job_model,
273                worker_nodes,
274                entry_fragment_parallelism,
275                database_resource_group.to_owned(),
276                distribution_type,
277                vnode_count,
278            )?;
279
280            for fid in ensemble.component_fragments() {
281                let f = &fragments[&fid];
282                let aligner =
283                    ComponentFragmentAligner::new_persistent(&actor_template, actor_id_generator);
284                let assignments = aligner.align_component_actor(f.distribution_type);
285                actor_assignments.insert(fid, assignments);
286            }
287        }
288
289        // Step 3: Expand assignments into StreamActor + actor_location + InflightFragmentInfo.
290        let mut stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
291        let mut actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
292
293        for (fragment_id, assignments) in &actor_assignments {
294            let mut actors = Vec::with_capacity(assignments.len());
295            for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
296                actor_location.insert(actor_id, *worker_id);
297                let stream_context = streaming_job_model.stream_context();
298                actors.push(StreamActor {
299                    actor_id,
300                    fragment_id: *fragment_id,
301                    vnode_bitmap: vnode_bitmap.clone(),
302                    mview_definition: definition.to_owned(),
303                    expr_context: Some(stream_context.to_expr_context()),
304                    config_override: stream_context.config_override.clone(),
305                });
306            }
307            stream_actors.insert(*fragment_id, actors);
308        }
309
310        // Build InflightFragmentInfo from logical fragments + rendered actors.
311        let fragment_infos: HashMap<FragmentId, InflightFragmentInfo> = fragments
312            .iter()
313            .map(|(fragment_id, loaded)| {
314                let actors = stream_actors
315                    .get(fragment_id)
316                    .into_iter()
317                    .flatten()
318                    .map(|actor| {
319                        (
320                            actor.actor_id,
321                            crate::controller::fragment::InflightActorInfo {
322                                worker_id: actor_location[&actor.actor_id],
323                                vnode_bitmap: actor.vnode_bitmap.clone(),
324                                splits: vec![], // batch refresh has no source splits
325                            },
326                        )
327                    })
328                    .collect();
329                (
330                    *fragment_id,
331                    InflightFragmentInfo {
332                        fragment_id: *fragment_id,
333                        distribution_type: loaded.distribution_type,
334                        fragment_type_mask: loaded.fragment_type_mask,
335                        vnode_count: loaded.vnode_count,
336                        nodes: loaded.nodes.clone(),
337                        actors,
338                        state_table_ids: loaded.state_table_ids.clone(),
339                    },
340                )
341            })
342            .collect();
343
344        // Step 4: Build edges (internal-only, no upstream).
345        let mut builder = FragmentEdgeBuilder::new(fragment_infos.values().map(|f| {
346            (
347                f.fragment_id,
348                EdgeBuilderFragmentInfo::from_inflight_with_worker_nodes(
349                    f,
350                    partial_graph_id,
351                    worker_nodes,
352                ),
353            )
354        }));
355        builder.add_relations(downstreams);
356        let mut edges = builder.build();
357
358        let actors_to_create = edges.collect_actors_to_create(fragment_infos.values().map(|f| {
359            (
360                f.fragment_id,
361                &f.nodes,
362                f.actors.iter().map(|(actor_id, actor)| {
363                    let sa = stream_actors[&f.fragment_id]
364                        .iter()
365                        .find(|a| a.actor_id == *actor_id)
366                        .expect("should exist");
367                    (sa, actor.worker_id)
368                }),
369                vec![], // no subscribers for batch refresh jobs
370            )
371        }));
372
373        // Step 5: Build node_actors, state_table_ids.
374        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
375        let state_table_ids =
376            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
377
378        Ok(BatchRefreshRenderResult {
379            fragment_infos,
380            node_actors,
381            state_table_ids,
382            actors_to_create,
383        })
384    }
385
386    /// Build the initial `Add` mutation for the partial graph's first barrier.
387    ///
388    /// The rendered actors come from a prior `render_actors_and_build_job_info()` call;
389    /// `backfill_nodes_to_pause` is derived from the job's backfill ordering.
390    pub(crate) fn build_initial_partial_graph_mutation(
391        render_result: &BatchRefreshRenderResult,
392        backfill_ordering: &ExtendedFragmentBackfillOrder,
393    ) -> Mutation {
394        let added_actors: Vec<ActorId> = render_result
395            .fragment_infos
396            .values()
397            .flat_map(|f| f.actors.keys().copied())
398            .collect();
399        let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_ordering)
400            .into_iter()
401            .collect();
402        Mutation::Add(AddMutation {
403            actor_dispatchers: Default::default(),
404            added_actors,
405            actor_splits: Default::default(),
406            pause: false,
407            subscriptions_to_add: Default::default(),
408            backfill_nodes_to_pause,
409            actor_cdc_table_snapshot_splits: None,
410            new_upstream_sinks: Default::default(),
411        })
412    }
413
414    /// Derive no-shuffle ensembles from fragment downstreams.
415    fn resolve_ensembles(
416        fragments: &HashMap<FragmentId, LoadedFragment>,
417        downstreams: &FragmentDownstreamRelation,
418    ) -> MetaResult<Vec<NoShuffleEnsemble>> {
419        let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
420        for (upstream_fid, relations) in downstreams {
421            for rel in relations {
422                if rel.dispatcher_type == DispatcherType::NoShuffle {
423                    new_no_shuffle
424                        .entry(*upstream_fid)
425                        .or_default()
426                        .insert(rel.downstream_fragment_id);
427                }
428            }
429        }
430
431        let mut ensembles = if new_no_shuffle.is_empty() {
432            Vec::new()
433        } else {
434            let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
435                .iter()
436                .flat_map(|(u, ds)| ds.iter().map(move |d| (*u, *d)))
437                .collect();
438            let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
439                .iter()
440                .flat_map(|(u, d)| [*u, *d])
441                .collect::<HashSet<_>>()
442                .into_iter()
443                .collect();
444            let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
445            find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
446        };
447
448        // Add standalone fragments as single-fragment ensembles.
449        let covered: HashSet<FragmentId> = ensembles
450            .iter()
451            .flat_map(|e| e.component_fragments())
452            .collect();
453        for fragment_id in fragments.keys() {
454            if !covered.contains(fragment_id) {
455                ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
456            }
457        }
458
459        Ok(ensembles)
460    }
461}
462
463// ── Construction ──────────────────────────────────────────────────────────────
464
465impl BatchRefreshJobCheckpointControl {
466    /// Create from DDL command. Starts in `ConsumingSnapshot`.
467    ///
468    /// Internally calls `render_actors_and_build_job_info()` and injects the
469    /// partial-graph initial barrier.
470    #[expect(clippy::too_many_arguments)]
471    pub(crate) fn new(
472        database_id: DatabaseId,
473        job_id: JobId,
474        create_info: CreateSnapshotBackfillJobCommandInfo,
475        notifiers: Vec<Notifier>,
476        snapshot_backfill_upstream_tables: HashSet<TableId>,
477        snapshot_epoch: u64,
478        version_stat: &HummockVersionStats,
479        partial_graph_manager: &mut PartialGraphManager,
480        logical: &BatchRefreshLogicalFragments,
481        worker_nodes: &HashMap<WorkerId, WorkerNode>,
482        batch_refresh_seconds: u64,
483    ) -> MetaResult<Self> {
484        debug!(
485            %job_id,
486            "new batch refresh job"
487        );
488
489        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
490        let backfill_ordering = &create_info.info.fragment_backfill_ordering;
491        let actor_id_generator = partial_graph_manager
492            .control_stream_manager()
493            .env
494            .actor_id_generator();
495
496        let render_result = Self::render_actors_and_build_job_info(
497            &logical.fragments,
498            &logical.downstreams,
499            &create_info.info.definition,
500            actor_id_generator,
501            worker_nodes,
502            &create_info.info.database_resource_group,
503            &create_info.info.streaming_job_model,
504            partial_graph_id,
505        )?;
506        let initial_partial_graph_mutation =
507            Self::build_initial_partial_graph_mutation(&render_result, backfill_ordering);
508
509        let backfill_order_state = BackfillOrderState::new(
510            backfill_ordering,
511            &render_result.fragment_infos,
512            create_info
513                .info
514                .locality_fragment_state_table_mapping
515                .clone(),
516        );
517        let create_mview_tracker = CreateMviewProgressTracker::recover(
518            job_id,
519            &render_result.fragment_infos,
520            backfill_order_state,
521            version_stat,
522        );
523
524        let mut prev_epoch_fake_physical_time = 0;
525        let mut pending_non_checkpoint_barriers = vec![];
526
527        let initial_barrier_info = super::new_fake_barrier(
528            &mut prev_epoch_fake_physical_time,
529            &mut pending_non_checkpoint_barriers,
530            PbBarrierKind::Checkpoint,
531        );
532
533        let mut graph_adder = partial_graph_manager.add_partial_graph(
534            partial_graph_id,
535            BatchRefreshBarrierStats::new(job_id, snapshot_epoch),
536        );
537
538        if let Err(e) = Self::inject_barrier(
539            partial_graph_id,
540            graph_adder.manager(),
541            &render_result.node_actors,
542            &render_result.state_table_ids,
543            initial_barrier_info,
544            Some(render_result.actors_to_create),
545            Some(initial_partial_graph_mutation),
546            notifiers,
547            Some(create_info),
548            false,
549        ) {
550            graph_adder.failed();
551            return Err(e);
552        }
553
554        graph_adder.added();
555        assert!(pending_non_checkpoint_barriers.is_empty());
556        let this = Self {
557            partial_graph_id,
558            job_id,
559            snapshot_backfill_upstream_tables,
560            snapshot_epoch,
561            batch_refresh_seconds,
562
563            status: BatchRefreshJobStatus::ConsumingSnapshot {
564                prev_epoch_fake_physical_time,
565                version_stats: version_stat.clone(),
566                create_mview_tracker,
567                snapshot_epoch,
568                fragment_infos: render_result.fragment_infos,
569                pending_non_checkpoint_barriers,
570                node_actors: render_result.node_actors,
571                state_table_ids: render_result.state_table_ids,
572            },
573        };
574        Ok(this)
575    }
576
577    /// Recover from a persistent state during recovery.
578    ///
579    /// - If `committed_epoch >= snapshot_epoch` → Idle (snapshot completed before crash).
580    /// - If `committed_epoch < snapshot_epoch` → `ConsumingSnapshot` using pre-rendered actors.
581    #[expect(clippy::too_many_arguments)]
582    pub(crate) fn recover(
583        database_id: DatabaseId,
584        job_id: JobId,
585        snapshot_backfill_upstream_tables: HashSet<TableId>,
586        snapshot_epoch: u64,
587        committed_epoch: u64,
588        backfill_order: ExtendedFragmentBackfillOrder,
589        version_stat: &HummockVersionStats,
590        initial_mutation: Mutation,
591        render_result: BatchRefreshRenderResult,
592        partial_graph_recoverer: &mut crate::barrier::partial_graph::PartialGraphRecoverer<'_>,
593        batch_refresh_seconds: u64,
594    ) -> MetaResult<Self> {
595        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
596
597        if committed_epoch >= snapshot_epoch {
598            // Snapshot completed; recover to Idle.
599            info!(
600                %job_id,
601                committed_epoch,
602                snapshot_epoch,
603                "recovered idle batch refresh job (no partial graph)"
604            );
605            return Ok(Self {
606                job_id,
607                partial_graph_id,
608                snapshot_backfill_upstream_tables,
609                snapshot_epoch,
610                batch_refresh_seconds,
611
612                status: BatchRefreshJobStatus::Idle {
613                    last_committed_epoch: committed_epoch,
614                },
615            });
616        }
617
618        // Snapshot still in-progress; recover to ConsumingSnapshot.
619        info!(
620            %job_id,
621            committed_epoch,
622            snapshot_epoch,
623            "recovered batch refresh job to consuming snapshot"
624        );
625
626        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
627        let mut pending_non_checkpoint_barriers = vec![];
628
629        let locality_fragment_state_table_mapping =
630            crate::barrier::rpc::build_locality_fragment_state_table_mapping(
631                &render_result.fragment_infos,
632            );
633        let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
634            &backfill_order,
635            &render_result.fragment_infos,
636            locality_fragment_state_table_mapping,
637        );
638
639        let create_mview_tracker = CreateMviewProgressTracker::recover(
640            job_id,
641            &render_result.fragment_infos,
642            backfill_order_state,
643            version_stat,
644        );
645
646        let first_barrier_info = super::new_fake_barrier(
647            &mut prev_epoch_fake_physical_time,
648            &mut pending_non_checkpoint_barriers,
649            PbBarrierKind::Initial,
650        );
651
652        partial_graph_recoverer.recover_graph(
653            partial_graph_id,
654            initial_mutation,
655            &first_barrier_info,
656            &render_result.node_actors,
657            render_result.state_table_ids.iter().copied(),
658            render_result.actors_to_create,
659            BatchRefreshBarrierStats::new(job_id, snapshot_epoch),
660        )?;
661
662        Ok(Self {
663            job_id,
664            partial_graph_id,
665            snapshot_backfill_upstream_tables,
666            snapshot_epoch,
667            batch_refresh_seconds,
668            status: BatchRefreshJobStatus::ConsumingSnapshot {
669                prev_epoch_fake_physical_time,
670                version_stats: version_stat.clone(),
671                create_mview_tracker,
672                fragment_infos: render_result.fragment_infos,
673                snapshot_epoch,
674                pending_non_checkpoint_barriers,
675                node_actors: render_result.node_actors,
676                state_table_ids: render_result.state_table_ids,
677            },
678        })
679    }
680}
681
682// ── Barrier injection ─────────────────────────────────────────────────────────
683
684impl BatchRefreshJobCheckpointControl {
685    fn inject_barrier(
686        partial_graph_id: PartialGraphId,
687        partial_graph_manager: &mut PartialGraphManager,
688        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
689        state_table_ids: &HashSet<TableId>,
690        barrier_info: BarrierInfo,
691        new_actors: Option<StreamJobActorsToCreate>,
692        mutation: Option<Mutation>,
693        notifiers: Vec<Notifier>,
694        first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
695        is_stop: bool,
696    ) -> MetaResult<()> {
697        if is_stop {
698            assert!(
699                matches!(&mutation, Some(Mutation::Stop(_))),
700                "stop barrier must carry a Stop mutation"
701            );
702        }
703        partial_graph_manager.inject_barrier(
704            partial_graph_id,
705            mutation,
706            node_actors,
707            state_table_ids.iter().copied(),
708            if is_stop {
709                // Stop barrier: data already synced by the prior checkpoint.
710                itertools::Either::Left(std::iter::empty())
711            } else {
712                itertools::Either::Right(node_actors.keys().copied())
713            },
714            new_actors,
715            PartialGraphBarrierInfo::new(
716                first_create_info.map_or_else(
717                    PostCollectCommand::barrier,
718                    CreateSnapshotBackfillJobCommandInfo::into_post_collect,
719                ),
720                barrier_info,
721                notifiers,
722                state_table_ids.clone(),
723            ),
724        )?;
725        Ok(())
726    }
727}
728
729// ── Barrier forwarding and collection ─────────────────────────────────────────
730
731impl BatchRefreshJobCheckpointControl {
732    pub(crate) fn on_new_upstream_barrier(
733        &mut self,
734        partial_graph_manager: &mut PartialGraphManager,
735        barrier_info: &BarrierInfo,
736        mutation: Option<(Mutation, Vec<Notifier>)>,
737    ) -> MetaResult<()> {
738        if !matches!(self.status, BatchRefreshJobStatus::ConsumingSnapshot { .. }) {
739            // ConsumingLogStore has all barriers pre-injected; no forwarding needed.
740            // Idle and Resetting have no partial graph.
741            return Ok(());
742        }
743        let (mut mutation, mut notifiers) = match mutation {
744            Some((mutation, notifiers)) => (Some(mutation), notifiers),
745            None => (None, vec![]),
746        };
747
748        // Check if snapshot consumption is finished and we need to inject stop barriers.
749        let is_finished = matches!(
750            &self.status,
751            BatchRefreshJobStatus::ConsumingSnapshot { create_mview_tracker, .. }
752            if create_mview_tracker.is_finished()
753        );
754
755        if is_finished {
756            // Discard the upstream mutation — not needed for stop barriers.
757            mutation.take();
758
759            // Take the status out to destructure and transition to `FinishingSnapshot`.
760            // Use a placeholder; will be overwritten below.
761            let old_status = replace(
762                &mut self.status,
763                BatchRefreshJobStatus::Idle {
764                    last_committed_epoch: 0,
765                },
766            );
767            let BatchRefreshJobStatus::ConsumingSnapshot {
768                prev_epoch_fake_physical_time,
769                mut pending_non_checkpoint_barriers,
770                snapshot_epoch,
771                fragment_infos,
772                create_mview_tracker,
773                node_actors,
774                state_table_ids,
775                ..
776            } = old_status
777            else {
778                unreachable!()
779            };
780
781            let tracking_job = create_mview_tracker.into_tracking_job();
782
783            // Inject final checkpoint at snapshot epoch.
784            pending_non_checkpoint_barriers.push(snapshot_epoch);
785            let prev_epoch = Epoch::from_physical_time(prev_epoch_fake_physical_time);
786            let final_checkpoint = BarrierInfo {
787                curr_epoch: TracedEpoch::new(Epoch(snapshot_epoch)),
788                prev_epoch: TracedEpoch::new(prev_epoch),
789                kind: BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers)),
790            };
791
792            // Inject stop barrier with u64::MAX as curr_epoch and empty nodes_to_sync_table.
793            let stop_barrier = BarrierInfo {
794                prev_epoch: TracedEpoch::new(Epoch(snapshot_epoch)),
795                curr_epoch: TracedEpoch::new(Epoch(u64::MAX)),
796                kind: BarrierKind::Checkpoint(vec![snapshot_epoch]),
797            };
798
799            let stop_actors: Vec<ActorId> = fragment_infos
800                .values()
801                .flat_map(|f| f.actors.keys().copied())
802                .collect();
803
804            Self::inject_barrier(
805                self.partial_graph_id,
806                partial_graph_manager,
807                &node_actors,
808                &state_table_ids,
809                final_checkpoint,
810                None,
811                None,
812                take(&mut notifiers),
813                None,
814                false,
815            )?;
816            Self::inject_barrier(
817                self.partial_graph_id,
818                partial_graph_manager,
819                &node_actors,
820                &state_table_ids,
821                stop_barrier,
822                None,
823                Some(Mutation::Stop(StopMutation {
824                    actors: stop_actors,
825                    dropped_sink_fragments: vec![],
826                })),
827                vec![],
828                None,
829                true,
830            )?;
831
832            self.status = BatchRefreshJobStatus::FinishingSnapshot {
833                tracking_job: Some(tracking_job),
834                fragment_infos,
835            };
836        } else {
837            // Normal barrier — still consuming snapshot.
838            let BatchRefreshJobStatus::ConsumingSnapshot {
839                prev_epoch_fake_physical_time,
840                pending_non_checkpoint_barriers,
841                create_mview_tracker,
842                node_actors,
843                state_table_ids,
844                ..
845            } = &mut self.status
846            else {
847                unreachable!("is_finished was false, status must be ConsumingSnapshot")
848            };
849
850            // Forward a fake barrier to the partial graph.
851            let mutation = mutation.take().or_else(|| {
852                let pending_backfill_nodes = create_mview_tracker
853                    .take_pending_backfill_nodes()
854                    .collect_vec();
855                if pending_backfill_nodes.is_empty() {
856                    None
857                } else {
858                    Some(Mutation::StartFragmentBackfill(
859                        StartFragmentBackfillMutation {
860                            fragment_ids: pending_backfill_nodes,
861                        },
862                    ))
863                }
864            });
865            let barrier_to_inject = super::new_fake_barrier(
866                prev_epoch_fake_physical_time,
867                pending_non_checkpoint_barriers,
868                match barrier_info.kind {
869                    BarrierKind::Barrier => PbBarrierKind::Barrier,
870                    BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
871                    BarrierKind::Initial => {
872                        unreachable!("upstream new epoch should not be initial")
873                    }
874                },
875            );
876            Self::inject_barrier(
877                self.partial_graph_id,
878                partial_graph_manager,
879                node_actors,
880                state_table_ids,
881                barrier_to_inject,
882                None,
883                mutation,
884                take(&mut notifiers),
885                None,
886                false,
887            )?;
888        }
889        assert!(mutation.is_none(), "must have consumed mutation");
890        assert!(notifiers.is_empty(), "must consumed notifiers");
891        Ok(())
892    }
893
894    pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
895        match &mut self.status {
896            BatchRefreshJobStatus::ConsumingSnapshot {
897                create_mview_tracker,
898                version_stats,
899                ..
900            } => {
901                for progress in collected_barrier
902                    .resps
903                    .values()
904                    .flat_map(|resp| &resp.create_mview_progress)
905                {
906                    create_mview_tracker.apply_progress(progress, version_stats);
907                }
908                create_mview_tracker.is_finished()
909            }
910            BatchRefreshJobStatus::InitializingBatchRefresh { .. }
911            | BatchRefreshJobStatus::ConsumingLogStore { .. } => {
912                // All barriers are pre-injected; no progress tracking needed.
913                false
914            }
915            _ => false,
916        }
917    }
918}
919
920// ── Completing ────────────────────────────────────────────────────────────────
921
922impl BatchRefreshJobCheckpointControl {
923    #[expect(clippy::type_complexity)]
924    pub(crate) fn start_completing(
925        &mut self,
926        partial_graph_manager: &mut PartialGraphManager,
927    ) -> Option<(
928        u64,
929        HashMap<WorkerId, BarrierCompleteResponse>,
930        PartialGraphBarrierInfo,
931        Option<TrackingJob>,
932    )> {
933        match &self.status {
934            BatchRefreshJobStatus::ConsumingSnapshot { .. }
935            | BatchRefreshJobStatus::FinishingSnapshot { .. }
936            | BatchRefreshJobStatus::ConsumingLogStore { .. } => {}
937            BatchRefreshJobStatus::Idle { .. }
938            | BatchRefreshJobStatus::InitializingBatchRefresh { .. }
939            | BatchRefreshJobStatus::Resetting { .. } => {
940                return None;
941            }
942        };
943
944        partial_graph_manager
945            .start_completing(
946                self.partial_graph_id,
947                std::ops::Bound::Unbounded,
948                |_non_checkpoint_epoch, _resps, _| {
949                    // Progress already applied in `collect()`.
950                },
951            )
952            .map(|(epoch, resps, info)| {
953                // Take tracking job only when the snapshot stop barrier completes
954                // (i.e., we are in FinishingSnapshot and the epoch matches snapshot_epoch).
955                // Note: ConsumingLogStore's stop barrier also has prev_epoch == target_upstream_epoch,
956                // which may coincidentally equal snapshot_epoch if no new upstream commits occurred.
957                // We must check the status, not just the epoch, to avoid a false positive.
958                let tracking_job = match &mut self.status {
959                    BatchRefreshJobStatus::FinishingSnapshot { tracking_job, .. }
960                        if epoch == self.snapshot_epoch =>
961                    {
962                        Some(
963                            tracking_job
964                                .take()
965                                .expect("tracking job should not have been taken yet"),
966                        )
967                    }
968                    _ => None,
969                };
970                (epoch, resps, info, tracking_job)
971            })
972    }
973
974    pub(super) fn ack_completed(
975        &mut self,
976        partial_graph_manager: &mut PartialGraphManager,
977        completed_epoch: u64,
978    ) {
979        match &self.status {
980            BatchRefreshJobStatus::ConsumingSnapshot { .. } => {
981                partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
982            }
983            BatchRefreshJobStatus::FinishingSnapshot { tracking_job, .. }
984                if completed_epoch == self.snapshot_epoch =>
985            {
986                partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
987                assert!(
988                    tracking_job.is_none(),
989                    "tracking job should have been taken at start_completing"
990                );
991                info!(
992                    job_id = %self.job_id,
993                    completed_epoch,
994                    "batch refresh job: snapshot done, transitioned to idle, removing partial graph"
995                );
996                partial_graph_manager.remove_partial_graphs(vec![self.partial_graph_id]);
997                self.status = BatchRefreshJobStatus::Idle {
998                    last_committed_epoch: completed_epoch,
999                };
1000            }
1001            BatchRefreshJobStatus::FinishingSnapshot { .. } => {
1002                partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
1003            }
1004            BatchRefreshJobStatus::ConsumingLogStore {
1005                target_upstream_epoch,
1006                ..
1007            } if completed_epoch == *target_upstream_epoch => {
1008                let target = *target_upstream_epoch;
1009                partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
1010                info!(
1011                    job_id = %self.job_id,
1012                    completed_epoch,
1013                    target_upstream_epoch = target,
1014                    "batch refresh job: logstore done, transitioned to idle, removing partial graph"
1015                );
1016                partial_graph_manager.remove_partial_graphs(vec![self.partial_graph_id]);
1017                self.status = BatchRefreshJobStatus::Idle {
1018                    last_committed_epoch: target,
1019                };
1020            }
1021            BatchRefreshJobStatus::ConsumingLogStore { .. } => {
1022                partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
1023            }
1024            BatchRefreshJobStatus::Resetting { .. } => {
1025                // The job was dropped while the completing task was running in the background.
1026                // The partial graph has already been reset, so skip the ack.
1027            }
1028            BatchRefreshJobStatus::Idle { .. }
1029            | BatchRefreshJobStatus::InitializingBatchRefresh { .. } => {
1030                unreachable!("batch refresh job should not be completing in this state")
1031            }
1032        }
1033    }
1034
1035    /// Called when the partial graph reset is confirmed (drop only).
1036    pub(super) fn on_partial_graph_reset(mut self) {
1037        match &mut self.status {
1038            BatchRefreshJobStatus::Resetting { notifiers } => {
1039                for notifier in notifiers.drain(..) {
1040                    notifier.notify_collected();
1041                }
1042            }
1043            _ => {
1044                panic!(
1045                    "batch refresh job {}: on_partial_graph_reset in unexpected state {:?}",
1046                    self.job_id, self.status
1047                );
1048            }
1049        }
1050    }
1051}
1052
1053// ── Query methods ─────────────────────────────────────────────────────────────
1054
1055impl BatchRefreshJobCheckpointControl {
1056    pub(crate) fn gen_backfill_progress(&self) -> Option<BackfillProgress> {
1057        match &self.status {
1058            BatchRefreshJobStatus::ConsumingSnapshot {
1059                create_mview_tracker,
1060                ..
1061            } => {
1062                let progress = if create_mview_tracker.is_finished() {
1063                    "Snapshot finished".to_owned()
1064                } else {
1065                    let progress = create_mview_tracker.gen_backfill_progress();
1066                    format!("BatchRefresh Snapshot [{}]", progress)
1067                };
1068                Some(BackfillProgress {
1069                    progress,
1070                    backfill_type: PbBackfillType::SnapshotBackfill,
1071                })
1072            }
1073            BatchRefreshJobStatus::FinishingSnapshot { .. } => Some(BackfillProgress {
1074                progress: "BatchRefresh Stopping".to_owned(),
1075                backfill_type: PbBackfillType::SnapshotBackfill,
1076            }),
1077            BatchRefreshJobStatus::InitializingBatchRefresh { .. }
1078            | BatchRefreshJobStatus::ConsumingLogStore { .. } => Some(BackfillProgress {
1079                progress: "BatchRefresh LogStore".to_owned(),
1080                backfill_type: PbBackfillType::SnapshotBackfill,
1081            }),
1082            BatchRefreshJobStatus::Idle { .. } | BatchRefreshJobStatus::Resetting { .. } => None,
1083        }
1084    }
1085
1086    pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
1087        match &self.status {
1088            BatchRefreshJobStatus::ConsumingSnapshot {
1089                create_mview_tracker,
1090                fragment_infos,
1091                ..
1092            } => create_mview_tracker.collect_fragment_progress(fragment_infos, true),
1093            BatchRefreshJobStatus::FinishingSnapshot { fragment_infos, .. } => {
1094                collect_done_fragments(self.job_id, fragment_infos)
1095            }
1096            _ => vec![],
1097        }
1098    }
1099
1100    /// Returns the pinned upstream log epoch and upstream table IDs.
1101    pub(super) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
1102        match &self.status {
1103            BatchRefreshJobStatus::ConsumingSnapshot { .. }
1104            | BatchRefreshJobStatus::FinishingSnapshot { .. } => (
1105                self.snapshot_epoch,
1106                self.snapshot_backfill_upstream_tables.clone(),
1107            ),
1108            BatchRefreshJobStatus::ConsumingLogStore {
1109                logstore_start_epoch,
1110                ..
1111            }
1112            | BatchRefreshJobStatus::InitializingBatchRefresh {
1113                logstore_start_epoch,
1114                ..
1115            } => (
1116                *logstore_start_epoch,
1117                self.snapshot_backfill_upstream_tables.clone(),
1118            ),
1119            BatchRefreshJobStatus::Idle {
1120                last_committed_epoch,
1121            } => (
1122                *last_committed_epoch,
1123                self.snapshot_backfill_upstream_tables.clone(),
1124            ),
1125            BatchRefreshJobStatus::Resetting { .. } => (0, HashSet::new()),
1126        }
1127    }
1128
1129    pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
1130        match &self.status {
1131            BatchRefreshJobStatus::ConsumingSnapshot { fragment_infos, .. } => Some(fragment_infos),
1132            BatchRefreshJobStatus::InitializingBatchRefresh { fragment_infos, .. } => {
1133                Some(fragment_infos)
1134            }
1135            BatchRefreshJobStatus::ConsumingLogStore { fragment_infos, .. } => Some(fragment_infos),
1136            BatchRefreshJobStatus::FinishingSnapshot { .. }
1137            | BatchRefreshJobStatus::Idle { .. }
1138            | BatchRefreshJobStatus::Resetting { .. } => None,
1139        }
1140    }
1141
1142    pub(crate) fn is_snapshot_backfilling(&self) -> bool {
1143        matches!(
1144            self.status,
1145            BatchRefreshJobStatus::ConsumingSnapshot { .. }
1146                | BatchRefreshJobStatus::FinishingSnapshot { .. }
1147                | BatchRefreshJobStatus::InitializingBatchRefresh { .. }
1148                | BatchRefreshJobStatus::ConsumingLogStore { .. }
1149        )
1150    }
1151
1152    /// Whether this idle job should start a refresh run.
1153    ///
1154    /// Returns `true` if the job is idle and the upstream committed epoch is
1155    /// far enough ahead of the job's last committed epoch (by `batch_refresh_seconds`).
1156    pub(crate) fn should_start_refresh(&self, upstream_committed_epoch: u64) -> bool {
1157        if let BatchRefreshJobStatus::Idle {
1158            last_committed_epoch,
1159        } = &self.status
1160        {
1161            let job_physical_ms = Epoch(*last_committed_epoch).physical_time();
1162            let upstream_physical_ms = Epoch(upstream_committed_epoch).physical_time();
1163            let threshold_ms = self.batch_refresh_seconds * 1000;
1164            upstream_physical_ms.saturating_sub(job_physical_ms) >= threshold_ms
1165        } else {
1166            false
1167        }
1168    }
1169
1170    /// Returns the last committed epoch if the job is idle.
1171    pub(crate) fn last_committed_epoch(&self) -> Option<u64> {
1172        if let BatchRefreshJobStatus::Idle {
1173            last_committed_epoch,
1174        } = &self.status
1175        {
1176            Some(*last_committed_epoch)
1177        } else {
1178            None
1179        }
1180    }
1181}
1182
1183// ── Logstore refresh run ──────────────────────────────────────────────────────
1184
1185impl BatchRefreshJobCheckpointControl {
1186    /// Start a logstore consumption run.
1187    ///
1188    /// Preconditions: the job must be `Idle`.
1189    ///
1190    /// 1. Resolves log epochs from the hummock changelog
1191    /// 2. Re-renders actors using the cached context
1192    /// 3. Injects all barriers at once (first with `AddMutation`, last with `StopMutation`)
1193    /// 4. Transitions to `ConsumingLogStore`
1194    ///
1195    /// Returns `true` if a refresh run was started, `false` if there are no
1196    /// log epochs to consume (early return, stays idle).
1197    pub(crate) fn start_refresh_run(
1198        &mut self,
1199        context: &BatchRefreshJobTriggerContext,
1200        worker_nodes: &HashMap<WorkerId, WorkerNode>,
1201        actor_id_counter: &AtomicU32,
1202        partial_graph_manager: &mut PartialGraphManager,
1203    ) -> MetaResult<bool> {
1204        let last_committed_epoch = match &self.status {
1205            BatchRefreshJobStatus::Idle {
1206                last_committed_epoch,
1207            } => *last_committed_epoch,
1208            _ => panic!(
1209                "batch refresh job {}: start_refresh_run called in non-Idle state {:?}",
1210                self.job_id, self.status
1211            ),
1212        };
1213
1214        // Resolve log epochs into barrier infos.
1215        let target_upstream_epoch = context.target_upstream_epoch;
1216        let Some((first_epoch, pending_log_barriers)) = Self::resolve_log_epoch_barriers(
1217            &self.snapshot_backfill_upstream_tables,
1218            &context.upstream_table_log_epochs,
1219            last_committed_epoch,
1220        )?
1221        else {
1222            info!(
1223                job_id = %self.job_id,
1224                last_committed_epoch,
1225                target_upstream_epoch,
1226                "batch refresh job: no log epochs to consume, staying idle"
1227            );
1228            return Ok(false);
1229        };
1230
1231        let log_target_epoch = pending_log_barriers.last().expect("non-empty").prev_epoch();
1232        if target_upstream_epoch != log_target_epoch {
1233            info!(
1234                job_id = %self.job_id,
1235                last_committed_epoch,
1236                target_upstream_epoch,
1237                log_target_epoch,
1238                "batch refresh job: upstream target has no resolved changelog yet, staying idle"
1239            );
1240            return Ok(false);
1241        }
1242
1243        // Build logical fragments from cached context.
1244        let logical = BatchRefreshLogicalFragments::from_context(context);
1245
1246        // Re-render actors.
1247        let render_result = Self::render_actors_and_build_job_info(
1248            &logical.fragments,
1249            &logical.downstreams,
1250            &context.definition,
1251            actor_id_counter,
1252            worker_nodes,
1253            &context.database_resource_group,
1254            &context.streaming_job_model,
1255            self.partial_graph_id,
1256        )?;
1257
1258        // Build actors_to_create and initial mutation.
1259        let added_actors: Vec<ActorId> = render_result
1260            .fragment_infos
1261            .values()
1262            .flat_map(|fragment| fragment.actors.keys().copied())
1263            .collect();
1264
1265        let initial_mutation = Mutation::Add(AddMutation {
1266            actor_dispatchers: Default::default(),
1267            added_actors,
1268            actor_splits: Default::default(),
1269            pause: false,
1270            subscriptions_to_add: Default::default(),
1271            backfill_nodes_to_pause: Default::default(),
1272            actor_cdc_table_snapshot_splits: None,
1273            new_upstream_sinks: Default::default(),
1274        });
1275
1276        let node_actors = &render_result.node_actors;
1277        let state_table_ids = &render_result.state_table_ids;
1278        let initial_barrier = BarrierInfo {
1279            prev_epoch: TracedEpoch::new(Epoch(last_committed_epoch)),
1280            curr_epoch: TracedEpoch::new(Epoch(first_epoch)),
1281            kind: BarrierKind::Initial,
1282        };
1283        let mut partial_graph_recoverer = partial_graph_manager.start_recover();
1284        let recover_result = partial_graph_recoverer.recover_graph(
1285            self.partial_graph_id,
1286            initial_mutation,
1287            &initial_barrier,
1288            node_actors,
1289            state_table_ids.iter().copied(),
1290            render_result.actors_to_create,
1291            BatchRefreshBarrierStats::new(self.job_id, self.snapshot_epoch),
1292        );
1293        match recover_result {
1294            Ok(()) => {
1295                let initializing_partial_graphs = partial_graph_recoverer.all_initializing();
1296                debug_assert_eq!(initializing_partial_graphs.len(), 1);
1297                debug_assert!(initializing_partial_graphs.contains(&self.partial_graph_id));
1298            }
1299            Err(e) => {
1300                partial_graph_recoverer.failed();
1301                return Err(e);
1302            }
1303        }
1304
1305        let logstore_start_epoch = last_committed_epoch;
1306
1307        info!(
1308            job_id = %self.job_id,
1309            last_committed_epoch,
1310            target_upstream_epoch,
1311            num_log_barriers = pending_log_barriers.len(),
1312            "batch refresh job: initialized logstore consumption partial graph"
1313        );
1314
1315        self.status = BatchRefreshJobStatus::InitializingBatchRefresh {
1316            fragment_infos: render_result.fragment_infos,
1317            node_actors: render_result.node_actors,
1318            state_table_ids: render_result.state_table_ids,
1319            pending_log_barriers,
1320            logstore_start_epoch,
1321            target_upstream_epoch,
1322        };
1323
1324        Ok(true)
1325    }
1326
1327    pub(crate) fn on_log_store_initialized(
1328        &mut self,
1329        partial_graph_manager: &mut PartialGraphManager,
1330    ) -> MetaResult<()> {
1331        let old_status = replace(
1332            &mut self.status,
1333            BatchRefreshJobStatus::Idle {
1334                last_committed_epoch: 0,
1335            },
1336        );
1337        let BatchRefreshJobStatus::InitializingBatchRefresh {
1338            fragment_infos,
1339            node_actors,
1340            state_table_ids,
1341            pending_log_barriers,
1342            logstore_start_epoch,
1343            target_upstream_epoch,
1344        } = old_status
1345        else {
1346            panic!(
1347                "batch refresh job {}: logstore initialized in unexpected status {:?}",
1348                self.job_id, old_status
1349            );
1350        };
1351
1352        let final_barrier_idx = pending_log_barriers.len() - 1;
1353        let mut stop_mutation = Some(Mutation::Stop(StopMutation {
1354            actors: fragment_infos
1355                .values()
1356                .flat_map(|fragment| fragment.actors.keys().copied())
1357                .collect(),
1358            dropped_sink_fragments: vec![],
1359        }));
1360        for (idx, barrier) in pending_log_barriers.into_iter().enumerate() {
1361            let is_stop_barrier = idx == final_barrier_idx;
1362            let mutation = is_stop_barrier.then(|| stop_mutation.take().expect("unused"));
1363            Self::inject_barrier(
1364                self.partial_graph_id,
1365                partial_graph_manager,
1366                &node_actors,
1367                &state_table_ids,
1368                barrier,
1369                None,
1370                mutation,
1371                vec![],
1372                None,
1373                is_stop_barrier,
1374            )?;
1375        }
1376
1377        self.status = BatchRefreshJobStatus::ConsumingLogStore {
1378            fragment_infos,
1379            logstore_start_epoch,
1380            target_upstream_epoch,
1381        };
1382        Ok(())
1383    }
1384
1385    /// Resolve upstream log epochs from the hummock changelog into barrier infos.
1386    ///
1387    /// Returns `(first_epoch, log_barriers)`. `first_epoch` is consumed by the
1388    /// initial barrier. `log_barriers` contains all barriers to inject after
1389    /// initialization, ending with the final checkpoint stop barrier.
1390    fn resolve_log_epoch_barriers(
1391        snapshot_backfill_upstream_tables: &HashSet<TableId>,
1392        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
1393        exclusive_start_log_epoch: u64,
1394    ) -> MetaResult<Option<(u64, Vec<BarrierInfo>)>> {
1395        let table_id = snapshot_backfill_upstream_tables
1396            .iter()
1397            .next()
1398            .expect("snapshot backfill job should have upstream");
1399        let Some(epochs) = upstream_table_log_epochs.get(table_id) else {
1400            return Ok(None);
1401        };
1402
1403        // Find the starting point: skip entries up to and including exclusive_start_log_epoch.
1404        let mut epochs_iter = epochs.iter().peekable();
1405        loop {
1406            match epochs_iter.peek() {
1407                Some((_, checkpoint_epoch)) if *checkpoint_epoch <= exclusive_start_log_epoch => {
1408                    epochs_iter.next();
1409                }
1410                _ => break,
1411            }
1412        }
1413
1414        let mut epoch_infos = vec![];
1415        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
1416            epoch_infos.extend(
1417                non_checkpoint_epochs
1418                    .iter()
1419                    .copied()
1420                    .map(|epoch| (epoch, false)),
1421            );
1422            epoch_infos.push((*checkpoint_epoch, true));
1423        }
1424        if epoch_infos.is_empty() {
1425            return Ok(None);
1426        }
1427
1428        let first_epoch = epoch_infos[0].0;
1429        let mut pending_non_checkpoint_epochs = vec![];
1430        let mut replay_barriers = vec![];
1431        for window in epoch_infos.windows(2) {
1432            let (prev_epoch, is_checkpoint) = window[0];
1433            let curr_epoch = window[1].0;
1434            assert!(prev_epoch > exclusive_start_log_epoch);
1435            assert!(curr_epoch > prev_epoch);
1436            pending_non_checkpoint_epochs.push(prev_epoch);
1437            let kind = if is_checkpoint {
1438                BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_epochs))
1439            } else {
1440                BarrierKind::Barrier
1441            };
1442            replay_barriers.push(BarrierInfo {
1443                prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
1444                curr_epoch: TracedEpoch::new(Epoch(curr_epoch)),
1445                kind,
1446            });
1447        }
1448
1449        let (last_epoch, _) = *epoch_infos.last().expect("non-empty");
1450        assert!(last_epoch > exclusive_start_log_epoch);
1451        pending_non_checkpoint_epochs.push(last_epoch);
1452        replay_barriers.push(BarrierInfo {
1453            prev_epoch: TracedEpoch::new(Epoch(last_epoch)),
1454            curr_epoch: TracedEpoch::new(Epoch(u64::MAX)),
1455            kind: BarrierKind::Checkpoint(pending_non_checkpoint_epochs),
1456        });
1457
1458        Ok(Some((first_epoch, replay_barriers)))
1459    }
1460}
1461
1462impl BatchRefreshLogicalFragments {
1463    /// Build logical fragments from a trigger context.
1464    pub(crate) fn from_context(ctx: &BatchRefreshJobTriggerContext) -> Self {
1465        Self {
1466            fragments: ctx.fragments.clone(),
1467            downstreams: ctx.downstreams.clone(),
1468        }
1469    }
1470}
1471
1472// ── Drop handling ─────────────────────────────────────────────────────────────
1473
1474impl BatchRefreshJobCheckpointControl {
1475    /// Drop this batch refresh job.
1476    pub(super) fn drop(
1477        &mut self,
1478        notifiers: &mut Vec<Notifier>,
1479        partial_graph_manager: &mut PartialGraphManager,
1480    ) -> bool {
1481        match &mut self.status {
1482            BatchRefreshJobStatus::Resetting {
1483                notifiers: existing_notifiers,
1484                ..
1485            } => {
1486                for notifier in &mut *notifiers {
1487                    notifier.notify_started();
1488                }
1489                existing_notifiers.append(notifiers);
1490                true
1491            }
1492            BatchRefreshJobStatus::ConsumingSnapshot { .. }
1493            | BatchRefreshJobStatus::FinishingSnapshot { .. }
1494            | BatchRefreshJobStatus::InitializingBatchRefresh { .. }
1495            | BatchRefreshJobStatus::ConsumingLogStore { .. } => {
1496                for notifier in &mut *notifiers {
1497                    notifier.notify_started();
1498                }
1499                partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
1500                self.status = BatchRefreshJobStatus::Resetting {
1501                    notifiers: take(notifiers),
1502                };
1503                true
1504            }
1505            BatchRefreshJobStatus::Idle { .. } => {
1506                // Idle has no running partial graph, but we still go through
1507                // the reset flow so the cleanup path is uniform.
1508                for notifier in &mut *notifiers {
1509                    notifier.notify_started();
1510                }
1511                partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
1512                self.status = BatchRefreshJobStatus::Resetting {
1513                    notifiers: take(notifiers),
1514                };
1515                true
1516            }
1517        }
1518    }
1519
1520    /// Reset during database recovery.
1521    ///
1522    /// Returns `true` if the partial graph was already resetting (from a prior drop),
1523    /// meaning we should not issue a new reset request.
1524    pub(crate) fn reset(self) -> bool {
1525        match self.status {
1526            BatchRefreshJobStatus::ConsumingSnapshot { .. }
1527            | BatchRefreshJobStatus::FinishingSnapshot { .. }
1528            | BatchRefreshJobStatus::InitializingBatchRefresh { .. }
1529            | BatchRefreshJobStatus::ConsumingLogStore { .. }
1530            | BatchRefreshJobStatus::Idle { .. } => false,
1531            BatchRefreshJobStatus::Resetting { notifiers, .. } => {
1532                for notifier in notifiers {
1533                    notifier.notify_collected();
1534                }
1535                true
1536            }
1537        }
1538    }
1539}
1540
1541// ── Barrier stats ─────────────────────────────────────────────────────────────
1542
1543struct BatchRefreshBarrierStats {
1544    barrier_latency: LabelGuardedHistogram,
1545    inflight_barrier_num: LabelGuardedIntGauge,
1546}
1547
1548impl BatchRefreshBarrierStats {
1549    fn new(job_id: JobId, _snapshot_epoch: u64) -> Self {
1550        let table_id_str = format!("{}", job_id);
1551        Self {
1552            barrier_latency: GLOBAL_META_METRICS
1553                .snapshot_backfill_barrier_latency
1554                .with_guarded_label_values(&[table_id_str.as_str(), "batch_refresh_snapshot"]),
1555            inflight_barrier_num: GLOBAL_META_METRICS
1556                .snapshot_backfill_inflight_barrier_num
1557                .with_guarded_label_values(&[&table_id_str]),
1558        }
1559    }
1560}
1561
1562impl PartialGraphStat for BatchRefreshBarrierStats {
1563    fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
1564        self.barrier_latency.observe(barrier_latency_secs);
1565    }
1566
1567    fn observe_barrier_num(&self, inflight_barrier_num: usize, _collected_barrier_num: usize) {
1568        self.inflight_barrier_num.set(inflight_barrier_num as _);
1569    }
1570}