1use std::collections::{HashMap, HashSet};
24use std::mem::{replace, take};
25use std::sync::atomic::AtomicU32;
26
27use anyhow::anyhow;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, TableId};
30use risingwave_common::id::JobId;
31use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
32use risingwave_common::util::epoch::{Epoch, EpochPair};
33use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
34use risingwave_pb::common::WorkerNode;
35use risingwave_pb::ddl_service::PbBackfillType;
36use risingwave_pb::hummock::HummockVersionStats;
37use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
38use risingwave_pb::stream_plan::barrier::PbBarrierKind;
39use risingwave_pb::stream_plan::barrier_mutation::Mutation;
40use risingwave_pb::stream_plan::{AddMutation, StartFragmentBackfillMutation, StopMutation};
41use risingwave_pb::stream_service::BarrierCompleteResponse;
42use tracing::{debug, info};
43
44use crate::MetaResult;
45use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
46use crate::barrier::command::PostCollectCommand;
47use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
48use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
49use crate::barrier::info::BarrierInfo;
50use crate::barrier::notifier::Notifier;
51use crate::barrier::partial_graph::{
52 CollectedBarrier, PartialGraphBarrierInfo, PartialGraphManager, PartialGraphStat,
53};
54use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
55use crate::barrier::rpc::to_partial_graph_id;
56use crate::barrier::{
57 BackfillOrderState, BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch,
58};
59use crate::controller::fragment::InflightFragmentInfo;
60use crate::controller::scale::{
61 ComponentFragmentAligner, EnsembleActorTemplate, LoadedFragment, NoShuffleEnsemble,
62 build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
63};
64use crate::model::{
65 FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate, StreamingJobModelContextExt,
66};
67use crate::rpc::metrics::GLOBAL_META_METRICS;
68use crate::stream::ExtendedFragmentBackfillOrder;
69
70#[derive(Debug)]
81pub(crate) struct BatchRefreshLogicalFragments {
82 pub fragments: HashMap<FragmentId, LoadedFragment>,
84 pub downstreams: FragmentDownstreamRelation,
86}
87
88#[derive(Debug)]
93pub(crate) struct BatchRefreshRenderResult {
94 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
95 pub node_actors: HashMap<WorkerId, HashSet<ActorId>>,
96 pub state_table_ids: HashSet<TableId>,
97 pub actors_to_create: StreamJobActorsToCreate,
98}
99
100#[derive(Debug)]
103enum BatchRefreshJobStatus {
104 ConsumingSnapshot {
109 prev_epoch_fake_physical_time: u64,
110 version_stats: HummockVersionStats,
111 create_mview_tracker: CreateMviewProgressTracker,
112 snapshot_epoch: u64,
113 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
114 pending_non_checkpoint_barriers: Vec<u64>,
115 node_actors: HashMap<WorkerId, HashSet<ActorId>>,
116 state_table_ids: HashSet<TableId>,
117 },
118 FinishingSnapshot {
125 tracking_job: Option<TrackingJob>,
126 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
127 },
128 Idle,
130 Resetting { notifiers: Vec<Notifier> },
132}
133
134#[derive(Debug)]
145pub(crate) struct BatchRefreshJobCheckpointControl {
146 job_id: JobId,
147 partial_graph_id: PartialGraphId,
148 snapshot_backfill_upstream_tables: HashSet<TableId>,
149 snapshot_epoch: u64,
150
151 max_committed_epoch: Option<u64>,
152 status: BatchRefreshJobStatus,
153}
154
155impl BatchRefreshJobCheckpointControl {
158 pub(crate) fn render_actors_and_build_job_info(
169 fragments: &HashMap<FragmentId, LoadedFragment>,
170 downstreams: &FragmentDownstreamRelation,
171 definition: &str,
172 actor_id_generator: &AtomicU32,
174 worker_nodes: &HashMap<WorkerId, WorkerNode>,
175 database_resource_group: &str,
176 streaming_job_model: &streaming_job::Model,
177 partial_graph_id: PartialGraphId,
179 ) -> MetaResult<BatchRefreshRenderResult> {
180 let ensembles = Self::resolve_ensembles(fragments, downstreams)?;
182
183 let mut actor_assignments: HashMap<
185 FragmentId,
186 HashMap<ActorId, (WorkerId, Option<risingwave_common::bitmap::Bitmap>)>,
187 > = HashMap::new();
188
189 for ensemble in &ensembles {
190 let first_component = ensemble
192 .component_fragments()
193 .next()
194 .expect("ensemble must have at least one component");
195 let fragment = &fragments[&first_component];
196 let distribution_type = fragment.distribution_type;
197 let vnode_count = fragment.vnode_count;
198
199 for fid in ensemble.component_fragments() {
201 let f = &fragments[&fid];
202 assert_eq!(
203 vnode_count, f.vnode_count,
204 "fragments {} and {} in same ensemble have different vnode counts",
205 first_component, fid,
206 );
207 }
208
209 let entry_fragment_parallelism = ensemble
210 .entry_fragments()
211 .map(|fid| fragments[&fid].parallelism.clone())
212 .dedup()
213 .exactly_one()
214 .map_err(|_| {
215 anyhow!(
216 "entry fragments have inconsistent parallelism settings in batch refresh job"
217 )
218 })?;
219
220 let actor_template = EnsembleActorTemplate::render_new(
221 streaming_job_model,
222 worker_nodes,
223 entry_fragment_parallelism,
224 database_resource_group.to_owned(),
225 distribution_type,
226 vnode_count,
227 )?;
228
229 for fid in ensemble.component_fragments() {
230 let f = &fragments[&fid];
231 let aligner =
232 ComponentFragmentAligner::new_persistent(&actor_template, actor_id_generator);
233 let assignments = aligner.align_component_actor(f.distribution_type);
234 actor_assignments.insert(fid, assignments);
235 }
236 }
237
238 let mut stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
240 let mut actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
241
242 for (fragment_id, assignments) in &actor_assignments {
243 let mut actors = Vec::with_capacity(assignments.len());
244 for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
245 actor_location.insert(actor_id, *worker_id);
246 let stream_context = streaming_job_model.stream_context();
247 actors.push(StreamActor {
248 actor_id,
249 fragment_id: *fragment_id,
250 vnode_bitmap: vnode_bitmap.clone(),
251 mview_definition: definition.to_owned(),
252 expr_context: Some(stream_context.to_expr_context()),
253 config_override: stream_context.config_override.clone(),
254 });
255 }
256 stream_actors.insert(*fragment_id, actors);
257 }
258
259 let fragment_infos: HashMap<FragmentId, InflightFragmentInfo> = fragments
261 .iter()
262 .map(|(fragment_id, loaded)| {
263 let actors = stream_actors
264 .get(fragment_id)
265 .into_iter()
266 .flatten()
267 .map(|actor| {
268 (
269 actor.actor_id,
270 crate::controller::fragment::InflightActorInfo {
271 worker_id: actor_location[&actor.actor_id],
272 vnode_bitmap: actor.vnode_bitmap.clone(),
273 splits: vec![], },
275 )
276 })
277 .collect();
278 (
279 *fragment_id,
280 InflightFragmentInfo {
281 fragment_id: *fragment_id,
282 distribution_type: loaded.distribution_type,
283 fragment_type_mask: loaded.fragment_type_mask,
284 vnode_count: loaded.vnode_count,
285 nodes: loaded.nodes.clone(),
286 actors,
287 state_table_ids: loaded.state_table_ids.clone(),
288 },
289 )
290 })
291 .collect();
292
293 let mut builder = FragmentEdgeBuilder::new(fragment_infos.values().map(|f| {
295 (
296 f.fragment_id,
297 EdgeBuilderFragmentInfo::from_inflight_with_worker_nodes(
298 f,
299 partial_graph_id,
300 worker_nodes,
301 ),
302 )
303 }));
304 builder.add_relations(downstreams);
305 let mut edges = builder.build();
306
307 let actors_to_create = edges.collect_actors_to_create(fragment_infos.values().map(|f| {
308 (
309 f.fragment_id,
310 &f.nodes,
311 f.actors.iter().map(|(actor_id, actor)| {
312 let sa = stream_actors[&f.fragment_id]
313 .iter()
314 .find(|a| a.actor_id == *actor_id)
315 .expect("should exist");
316 (sa, actor.worker_id)
317 }),
318 vec![], )
320 }));
321
322 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
324 let state_table_ids =
325 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
326
327 Ok(BatchRefreshRenderResult {
328 fragment_infos,
329 node_actors,
330 state_table_ids,
331 actors_to_create,
332 })
333 }
334
335 pub(crate) fn build_initial_partial_graph_mutation(
340 render_result: &BatchRefreshRenderResult,
341 backfill_ordering: &ExtendedFragmentBackfillOrder,
342 ) -> Mutation {
343 let added_actors: Vec<ActorId> = render_result
344 .fragment_infos
345 .values()
346 .flat_map(|f| f.actors.keys().copied())
347 .collect();
348 let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_ordering)
349 .into_iter()
350 .collect();
351 Mutation::Add(AddMutation {
352 actor_dispatchers: Default::default(),
353 added_actors,
354 actor_splits: Default::default(),
355 pause: false,
356 subscriptions_to_add: Default::default(),
357 backfill_nodes_to_pause,
358 actor_cdc_table_snapshot_splits: None,
359 new_upstream_sinks: Default::default(),
360 })
361 }
362
363 fn resolve_ensembles(
365 fragments: &HashMap<FragmentId, LoadedFragment>,
366 downstreams: &FragmentDownstreamRelation,
367 ) -> MetaResult<Vec<NoShuffleEnsemble>> {
368 let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
369 for (upstream_fid, relations) in downstreams {
370 for rel in relations {
371 if rel.dispatcher_type == DispatcherType::NoShuffle {
372 new_no_shuffle
373 .entry(*upstream_fid)
374 .or_default()
375 .insert(rel.downstream_fragment_id);
376 }
377 }
378 }
379
380 let mut ensembles = if new_no_shuffle.is_empty() {
381 Vec::new()
382 } else {
383 let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
384 .iter()
385 .flat_map(|(u, ds)| ds.iter().map(move |d| (*u, *d)))
386 .collect();
387 let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
388 .iter()
389 .flat_map(|(u, d)| [*u, *d])
390 .collect::<HashSet<_>>()
391 .into_iter()
392 .collect();
393 let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
394 find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
395 };
396
397 let covered: HashSet<FragmentId> = ensembles
399 .iter()
400 .flat_map(|e| e.component_fragments())
401 .collect();
402 for fragment_id in fragments.keys() {
403 if !covered.contains(fragment_id) {
404 ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
405 }
406 }
407
408 Ok(ensembles)
409 }
410}
411
412impl BatchRefreshJobCheckpointControl {
415 pub(crate) fn new(
420 database_id: DatabaseId,
421 job_id: JobId,
422 create_info: CreateSnapshotBackfillJobCommandInfo,
423 notifiers: Vec<Notifier>,
424 snapshot_backfill_upstream_tables: HashSet<TableId>,
425 snapshot_epoch: u64,
426 version_stat: &HummockVersionStats,
427 partial_graph_manager: &mut PartialGraphManager,
428 logical: &BatchRefreshLogicalFragments,
429 worker_nodes: &HashMap<WorkerId, WorkerNode>,
430 ) -> MetaResult<Self> {
431 debug!(
432 %job_id,
433 "new batch refresh job"
434 );
435
436 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
437 let backfill_ordering = &create_info.info.fragment_backfill_ordering;
438 let actor_id_generator = partial_graph_manager
439 .control_stream_manager()
440 .env
441 .actor_id_generator();
442
443 let render_result = Self::render_actors_and_build_job_info(
444 &logical.fragments,
445 &logical.downstreams,
446 &create_info.info.definition,
447 actor_id_generator,
448 worker_nodes,
449 &create_info.info.database_resource_group,
450 &create_info.info.streaming_job_model,
451 partial_graph_id,
452 )?;
453 let initial_partial_graph_mutation =
454 Self::build_initial_partial_graph_mutation(&render_result, backfill_ordering);
455
456 let backfill_order_state = BackfillOrderState::new(
457 backfill_ordering,
458 &render_result.fragment_infos,
459 create_info
460 .info
461 .locality_fragment_state_table_mapping
462 .clone(),
463 );
464 let create_mview_tracker = CreateMviewProgressTracker::recover(
465 job_id,
466 &render_result.fragment_infos,
467 backfill_order_state,
468 version_stat,
469 );
470
471 let mut prev_epoch_fake_physical_time = 0;
472 let mut pending_non_checkpoint_barriers = vec![];
473
474 let initial_barrier_info = super::new_fake_barrier(
475 &mut prev_epoch_fake_physical_time,
476 &mut pending_non_checkpoint_barriers,
477 PbBarrierKind::Checkpoint,
478 );
479
480 let mut graph_adder = partial_graph_manager.add_partial_graph(
481 partial_graph_id,
482 BatchRefreshBarrierStats::new(job_id, snapshot_epoch),
483 );
484
485 if let Err(e) = Self::inject_barrier(
486 partial_graph_id,
487 graph_adder.manager(),
488 &render_result.node_actors,
489 &render_result.state_table_ids,
490 initial_barrier_info,
491 Some(render_result.actors_to_create),
492 Some(initial_partial_graph_mutation),
493 notifiers,
494 Some(create_info),
495 false,
496 ) {
497 graph_adder.failed();
498 return Err(e);
499 }
500
501 graph_adder.added();
502 assert!(pending_non_checkpoint_barriers.is_empty());
503 let this = Self {
504 partial_graph_id,
505 job_id,
506 snapshot_backfill_upstream_tables,
507 snapshot_epoch,
508 max_committed_epoch: None,
509 status: BatchRefreshJobStatus::ConsumingSnapshot {
510 prev_epoch_fake_physical_time,
511 version_stats: version_stat.clone(),
512 create_mview_tracker,
513 snapshot_epoch,
514 fragment_infos: render_result.fragment_infos,
515 pending_non_checkpoint_barriers,
516 node_actors: render_result.node_actors,
517 state_table_ids: render_result.state_table_ids,
518 },
519 };
520 Ok(this)
521 }
522
523 pub(crate) fn recover(
528 database_id: DatabaseId,
529 job_id: JobId,
530 snapshot_backfill_upstream_tables: HashSet<TableId>,
531 snapshot_epoch: u64,
532 committed_epoch: u64,
533 backfill_order: ExtendedFragmentBackfillOrder,
534 version_stat: &HummockVersionStats,
535 initial_mutation: Mutation,
536 render_result: BatchRefreshRenderResult,
537 partial_graph_recoverer: &mut crate::barrier::partial_graph::PartialGraphRecoverer<'_>,
538 ) -> MetaResult<Self> {
539 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
540
541 if committed_epoch >= snapshot_epoch {
542 info!(
544 %job_id,
545 committed_epoch,
546 snapshot_epoch,
547 "recovered idle batch refresh job (no partial graph)"
548 );
549 return Ok(Self {
550 job_id,
551 partial_graph_id,
552 snapshot_backfill_upstream_tables,
553 snapshot_epoch,
554 max_committed_epoch: Some(committed_epoch),
555 status: BatchRefreshJobStatus::Idle,
556 });
557 }
558
559 info!(
561 %job_id,
562 committed_epoch,
563 snapshot_epoch,
564 "recovered batch refresh job to consuming snapshot"
565 );
566
567 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
568 let mut pending_non_checkpoint_barriers = vec![];
569
570 let locality_fragment_state_table_mapping =
571 crate::barrier::rpc::build_locality_fragment_state_table_mapping(
572 &render_result.fragment_infos,
573 );
574 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
575 &backfill_order,
576 &render_result.fragment_infos,
577 locality_fragment_state_table_mapping,
578 );
579
580 let create_mview_tracker = CreateMviewProgressTracker::recover(
581 job_id,
582 &render_result.fragment_infos,
583 backfill_order_state,
584 version_stat,
585 );
586
587 let first_barrier_info = super::new_fake_barrier(
588 &mut prev_epoch_fake_physical_time,
589 &mut pending_non_checkpoint_barriers,
590 PbBarrierKind::Initial,
591 );
592
593 partial_graph_recoverer.recover_graph(
594 partial_graph_id,
595 initial_mutation,
596 &first_barrier_info,
597 &render_result.node_actors,
598 render_result.state_table_ids.iter().copied(),
599 render_result.actors_to_create,
600 BatchRefreshBarrierStats::new(job_id, snapshot_epoch),
601 )?;
602
603 Ok(Self {
604 job_id,
605 partial_graph_id,
606 snapshot_backfill_upstream_tables,
607 snapshot_epoch,
608 max_committed_epoch: Some(committed_epoch),
609 status: BatchRefreshJobStatus::ConsumingSnapshot {
610 prev_epoch_fake_physical_time,
611 version_stats: version_stat.clone(),
612 create_mview_tracker,
613 fragment_infos: render_result.fragment_infos,
614 snapshot_epoch,
615 pending_non_checkpoint_barriers,
616 node_actors: render_result.node_actors,
617 state_table_ids: render_result.state_table_ids,
618 },
619 })
620 }
621}
622
623impl BatchRefreshJobCheckpointControl {
626 fn inject_barrier(
627 partial_graph_id: PartialGraphId,
628 partial_graph_manager: &mut PartialGraphManager,
629 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
630 state_table_ids: &HashSet<TableId>,
631 barrier_info: BarrierInfo,
632 new_actors: Option<StreamJobActorsToCreate>,
633 mutation: Option<Mutation>,
634 notifiers: Vec<Notifier>,
635 first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
636 is_stop: bool,
637 ) -> MetaResult<()> {
638 if is_stop {
639 assert!(
640 matches!(&mutation, Some(Mutation::Stop(_))),
641 "stop barrier must carry a Stop mutation"
642 );
643 }
644 partial_graph_manager.inject_barrier(
645 partial_graph_id,
646 mutation,
647 node_actors,
648 state_table_ids.iter().copied(),
649 if is_stop {
650 itertools::Either::Left(std::iter::empty())
652 } else {
653 itertools::Either::Right(node_actors.keys().copied())
654 },
655 new_actors,
656 PartialGraphBarrierInfo::new(
657 first_create_info.map_or_else(
658 PostCollectCommand::barrier,
659 CreateSnapshotBackfillJobCommandInfo::into_post_collect,
660 ),
661 barrier_info,
662 notifiers,
663 state_table_ids.clone(),
664 ),
665 )?;
666 Ok(())
667 }
668}
669
670impl BatchRefreshJobCheckpointControl {
673 pub(crate) fn on_new_upstream_barrier(
674 &mut self,
675 partial_graph_manager: &mut PartialGraphManager,
676 barrier_info: &BarrierInfo,
677 mutation: Option<(Mutation, Vec<Notifier>)>,
678 ) -> MetaResult<()> {
679 if !matches!(self.status, BatchRefreshJobStatus::ConsumingSnapshot { .. }) {
680 return Ok(());
681 }
682 let (mut mutation, mut notifiers) = match mutation {
683 Some((mutation, notifiers)) => (Some(mutation), notifiers),
684 None => (None, vec![]),
685 };
686
687 let is_finished = matches!(
689 &self.status,
690 BatchRefreshJobStatus::ConsumingSnapshot { create_mview_tracker, .. }
691 if create_mview_tracker.is_finished()
692 );
693
694 if is_finished {
695 mutation.take();
697
698 let old_status = replace(&mut self.status, BatchRefreshJobStatus::Idle);
700 let BatchRefreshJobStatus::ConsumingSnapshot {
701 prev_epoch_fake_physical_time,
702 mut pending_non_checkpoint_barriers,
703 snapshot_epoch,
704 fragment_infos,
705 create_mview_tracker,
706 node_actors,
707 state_table_ids,
708 ..
709 } = old_status
710 else {
711 unreachable!()
712 };
713
714 let tracking_job = create_mview_tracker.into_tracking_job();
715
716 pending_non_checkpoint_barriers.push(snapshot_epoch);
718 let prev_epoch = Epoch::from_physical_time(prev_epoch_fake_physical_time);
719 let final_checkpoint = BarrierInfo {
720 curr_epoch: TracedEpoch::new(Epoch(snapshot_epoch)),
721 prev_epoch: TracedEpoch::new(prev_epoch),
722 kind: BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers)),
723 };
724
725 let stop_barrier = BarrierInfo {
727 prev_epoch: TracedEpoch::new(Epoch(snapshot_epoch)),
728 curr_epoch: TracedEpoch::new(Epoch(u64::MAX)),
729 kind: BarrierKind::Checkpoint(vec![snapshot_epoch]),
730 };
731
732 let stop_actors: Vec<ActorId> = fragment_infos
733 .values()
734 .flat_map(|f| f.actors.keys().copied())
735 .collect();
736
737 Self::inject_barrier(
738 self.partial_graph_id,
739 partial_graph_manager,
740 &node_actors,
741 &state_table_ids,
742 final_checkpoint,
743 None,
744 None,
745 take(&mut notifiers),
746 None,
747 false,
748 )?;
749 Self::inject_barrier(
750 self.partial_graph_id,
751 partial_graph_manager,
752 &node_actors,
753 &state_table_ids,
754 stop_barrier,
755 None,
756 Some(Mutation::Stop(StopMutation {
757 actors: stop_actors,
758 dropped_sink_fragments: vec![],
759 })),
760 vec![],
761 None,
762 true,
763 )?;
764
765 self.status = BatchRefreshJobStatus::FinishingSnapshot {
766 tracking_job: Some(tracking_job),
767 fragment_infos,
768 };
769 } else {
770 let BatchRefreshJobStatus::ConsumingSnapshot {
772 prev_epoch_fake_physical_time,
773 pending_non_checkpoint_barriers,
774 create_mview_tracker,
775 node_actors,
776 state_table_ids,
777 ..
778 } = &mut self.status
779 else {
780 unreachable!("is_finished was false, status must be ConsumingSnapshot")
781 };
782
783 let mutation = mutation.take().or_else(|| {
785 let pending_backfill_nodes = create_mview_tracker
786 .take_pending_backfill_nodes()
787 .collect_vec();
788 if pending_backfill_nodes.is_empty() {
789 None
790 } else {
791 Some(Mutation::StartFragmentBackfill(
792 StartFragmentBackfillMutation {
793 fragment_ids: pending_backfill_nodes,
794 },
795 ))
796 }
797 });
798 let barrier_to_inject = super::new_fake_barrier(
799 prev_epoch_fake_physical_time,
800 pending_non_checkpoint_barriers,
801 match barrier_info.kind {
802 BarrierKind::Barrier => PbBarrierKind::Barrier,
803 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
804 BarrierKind::Initial => {
805 unreachable!("upstream new epoch should not be initial")
806 }
807 },
808 );
809 Self::inject_barrier(
810 self.partial_graph_id,
811 partial_graph_manager,
812 node_actors,
813 state_table_ids,
814 barrier_to_inject,
815 None,
816 mutation,
817 take(&mut notifiers),
818 None,
819 false,
820 )?;
821 }
822 assert!(mutation.is_none(), "must have consumed mutation");
823 assert!(notifiers.is_empty(), "must consumed notifiers");
824 Ok(())
825 }
826
827 pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
828 match &mut self.status {
829 BatchRefreshJobStatus::ConsumingSnapshot {
830 create_mview_tracker,
831 version_stats,
832 ..
833 } => {
834 for progress in collected_barrier
835 .resps
836 .values()
837 .flat_map(|resp| &resp.create_mview_progress)
838 {
839 create_mview_tracker.apply_progress(progress, version_stats);
840 }
841 create_mview_tracker.is_finished()
842 }
843 _ => false,
844 }
845 }
846}
847
848impl BatchRefreshJobCheckpointControl {
851 #[expect(clippy::type_complexity)]
852 pub(crate) fn start_completing(
853 &mut self,
854 partial_graph_manager: &mut PartialGraphManager,
855 ) -> Option<(
856 u64,
857 HashMap<WorkerId, BarrierCompleteResponse>,
858 PartialGraphBarrierInfo,
859 Option<TrackingJob>,
860 )> {
861 match &self.status {
862 BatchRefreshJobStatus::ConsumingSnapshot { .. }
863 | BatchRefreshJobStatus::FinishingSnapshot { .. } => {}
864 BatchRefreshJobStatus::Idle | BatchRefreshJobStatus::Resetting { .. } => {
865 return None;
866 }
867 };
868
869 partial_graph_manager
870 .start_completing(
871 self.partial_graph_id,
872 std::ops::Bound::Unbounded,
873 |_non_checkpoint_epoch, _resps, _| {
874 },
876 )
877 .map(|(epoch, resps, info)| {
878 let tracking_job = if epoch == self.snapshot_epoch {
882 match &mut self.status {
883 BatchRefreshJobStatus::FinishingSnapshot { tracking_job, .. } => Some(
884 tracking_job
885 .take()
886 .expect("tracking job should not have been taken yet"),
887 ),
888 _ => panic!(
889 "batch refresh job {}: expected FinishingSnapshot at snapshot epoch",
890 self.job_id
891 ),
892 }
893 } else {
894 None
895 };
896 (epoch, resps, info, tracking_job)
897 })
898 }
899
900 pub(super) fn ack_completed(
901 &mut self,
902 partial_graph_manager: &mut PartialGraphManager,
903 completed_epoch: u64,
904 ) {
905 partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
906 if let Some(prev_max_committed_epoch) = self.max_committed_epoch.replace(completed_epoch) {
907 assert!(completed_epoch > prev_max_committed_epoch);
908 }
909
910 if completed_epoch == self.snapshot_epoch {
911 match &self.status {
914 BatchRefreshJobStatus::FinishingSnapshot { tracking_job, .. } => {
915 assert!(
916 tracking_job.is_none(),
917 "tracking job should have been taken at start_completing"
918 );
919 }
920 _ => panic!(
921 "batch refresh job {}: expected FinishingSnapshot when completing snapshot epoch",
922 self.job_id
923 ),
924 }
925 info!(
926 job_id = %self.job_id,
927 completed_epoch,
928 "batch refresh job: transitioned to idle, removing partial graph"
929 );
930 partial_graph_manager.remove_partial_graphs(vec![self.partial_graph_id]);
931 self.status = BatchRefreshJobStatus::Idle;
932 }
933 }
934
935 pub(super) fn on_partial_graph_reset(mut self) {
937 match &mut self.status {
938 BatchRefreshJobStatus::Resetting { notifiers } => {
939 for notifier in notifiers.drain(..) {
940 notifier.notify_collected();
941 }
942 }
943 _ => {
944 panic!(
945 "batch refresh job {}: on_partial_graph_reset in unexpected state {:?}",
946 self.job_id, self.status
947 );
948 }
949 }
950 }
951}
952
953impl BatchRefreshJobCheckpointControl {
956 pub(crate) fn gen_backfill_progress(&self) -> Option<BackfillProgress> {
957 match &self.status {
958 BatchRefreshJobStatus::ConsumingSnapshot {
959 create_mview_tracker,
960 ..
961 } => {
962 let progress = if create_mview_tracker.is_finished() {
963 "Snapshot finished".to_owned()
964 } else {
965 let progress = create_mview_tracker.gen_backfill_progress();
966 format!("BatchRefresh Snapshot [{}]", progress)
967 };
968 Some(BackfillProgress {
969 progress,
970 backfill_type: PbBackfillType::SnapshotBackfill,
971 })
972 }
973 BatchRefreshJobStatus::FinishingSnapshot { .. } => Some(BackfillProgress {
974 progress: "BatchRefresh Stopping".to_owned(),
975 backfill_type: PbBackfillType::SnapshotBackfill,
976 }),
977 BatchRefreshJobStatus::Idle | BatchRefreshJobStatus::Resetting { .. } => None,
978 }
979 }
980
981 pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
982 match &self.status {
983 BatchRefreshJobStatus::ConsumingSnapshot {
984 create_mview_tracker,
985 fragment_infos,
986 ..
987 } => create_mview_tracker.collect_fragment_progress(fragment_infos, true),
988 BatchRefreshJobStatus::FinishingSnapshot { fragment_infos, .. } => {
989 collect_done_fragments(self.job_id, fragment_infos)
990 }
991 _ => vec![],
992 }
993 }
994
995 pub(super) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
997 (
998 self.snapshot_epoch,
999 self.snapshot_backfill_upstream_tables.clone(),
1000 )
1001 }
1002
1003 pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
1004 match &self.status {
1005 BatchRefreshJobStatus::ConsumingSnapshot { fragment_infos, .. } => Some(fragment_infos),
1006 BatchRefreshJobStatus::FinishingSnapshot { .. }
1007 | BatchRefreshJobStatus::Idle
1008 | BatchRefreshJobStatus::Resetting { .. } => None,
1009 }
1010 }
1011
1012 pub(crate) fn is_snapshot_backfilling(&self) -> bool {
1013 matches!(
1014 self.status,
1015 BatchRefreshJobStatus::ConsumingSnapshot { .. }
1016 | BatchRefreshJobStatus::FinishingSnapshot { .. }
1017 )
1018 }
1019}
1020
1021impl BatchRefreshJobCheckpointControl {
1024 pub(super) fn drop(
1026 &mut self,
1027 notifiers: &mut Vec<Notifier>,
1028 partial_graph_manager: &mut PartialGraphManager,
1029 ) -> bool {
1030 match &mut self.status {
1031 BatchRefreshJobStatus::Resetting {
1032 notifiers: existing_notifiers,
1033 } => {
1034 for notifier in &mut *notifiers {
1035 notifier.notify_started();
1036 }
1037 existing_notifiers.append(notifiers);
1038 true
1039 }
1040 BatchRefreshJobStatus::ConsumingSnapshot { .. }
1041 | BatchRefreshJobStatus::FinishingSnapshot { .. } => {
1042 for notifier in &mut *notifiers {
1043 notifier.notify_started();
1044 }
1045 partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
1046 self.status = BatchRefreshJobStatus::Resetting {
1047 notifiers: take(notifiers),
1048 };
1049 true
1050 }
1051 BatchRefreshJobStatus::Idle => {
1052 for notifier in &mut *notifiers {
1055 notifier.notify_started();
1056 }
1057 partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
1058 self.status = BatchRefreshJobStatus::Resetting {
1059 notifiers: take(notifiers),
1060 };
1061 true
1062 }
1063 }
1064 }
1065
1066 pub(crate) fn reset(self) -> bool {
1071 match self.status {
1072 BatchRefreshJobStatus::ConsumingSnapshot { .. }
1073 | BatchRefreshJobStatus::FinishingSnapshot { .. }
1074 | BatchRefreshJobStatus::Idle => false,
1075 BatchRefreshJobStatus::Resetting { notifiers } => {
1076 for notifier in notifiers {
1077 notifier.notify_collected();
1078 }
1079 true
1080 }
1081 }
1082 }
1083}
1084
1085struct BatchRefreshBarrierStats {
1088 barrier_latency: LabelGuardedHistogram,
1089 inflight_barrier_num: LabelGuardedIntGauge,
1090}
1091
1092impl BatchRefreshBarrierStats {
1093 fn new(job_id: JobId, _snapshot_epoch: u64) -> Self {
1094 let table_id_str = format!("{}", job_id);
1095 Self {
1096 barrier_latency: GLOBAL_META_METRICS
1097 .snapshot_backfill_barrier_latency
1098 .with_guarded_label_values(&[table_id_str.as_str(), "batch_refresh_snapshot"]),
1099 inflight_barrier_num: GLOBAL_META_METRICS
1100 .snapshot_backfill_inflight_barrier_num
1101 .with_guarded_label_values(&[&table_id_str]),
1102 }
1103 }
1104}
1105
1106impl PartialGraphStat for BatchRefreshBarrierStats {
1107 fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
1108 self.barrier_latency.observe(barrier_latency_secs);
1109 }
1110
1111 fn observe_barrier_num(&self, inflight_barrier_num: usize, _collected_barrier_num: usize) {
1112 self.inflight_barrier_num.set(inflight_barrier_num as _);
1113 }
1114}