1use std::collections::{HashMap, HashSet};
26use std::mem::{replace, take};
27use std::sync::atomic::AtomicU32;
28
29use anyhow::anyhow;
30use itertools::Itertools;
31use risingwave_common::catalog::{DatabaseId, TableId};
32use risingwave_common::id::JobId;
33use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
34use risingwave_common::util::epoch::{Epoch, EpochPair};
35use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
36use risingwave_pb::common::WorkerNode;
37use risingwave_pb::ddl_service::PbBackfillType;
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
40use risingwave_pb::stream_plan::barrier::PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::{AddMutation, StartFragmentBackfillMutation, StopMutation};
43use risingwave_pb::stream_service::BarrierCompleteResponse;
44use tracing::{debug, info};
45
46use crate::MetaResult;
47use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
48use crate::barrier::command::PostCollectCommand;
49use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
50use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
51use crate::barrier::info::BarrierInfo;
52use crate::barrier::notifier::Notifier;
53use crate::barrier::partial_graph::{
54 CollectedBarrier, PartialGraphBarrierInfo, PartialGraphManager, PartialGraphStat,
55};
56use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
57use crate::barrier::rpc::to_partial_graph_id;
58use crate::barrier::{
59 BackfillOrderState, BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch,
60};
61use crate::controller::fragment::InflightFragmentInfo;
62use crate::controller::scale::{
63 ComponentFragmentAligner, EnsembleActorTemplate, LoadedFragment, NoShuffleEnsemble,
64 build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
65};
66use crate::model::{
67 FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate, StreamingJobModelContextExt,
68};
69use crate::rpc::metrics::GLOBAL_META_METRICS;
70use crate::stream::ExtendedFragmentBackfillOrder;
71
72#[derive(Debug)]
83pub(crate) struct BatchRefreshLogicalFragments {
84 pub fragments: HashMap<FragmentId, LoadedFragment>,
86 pub downstreams: FragmentDownstreamRelation,
88}
89
90#[derive(Debug)]
95pub(crate) struct BatchRefreshRenderResult {
96 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
97 pub node_actors: HashMap<WorkerId, HashSet<ActorId>>,
98 pub state_table_ids: HashSet<TableId>,
99 pub actors_to_create: StreamJobActorsToCreate,
100}
101
102#[derive(Debug)]
110pub(crate) struct BatchRefreshJobTriggerContext {
111 pub fragments: HashMap<FragmentId, LoadedFragment>,
112 pub downstreams: FragmentDownstreamRelation,
113 pub streaming_job_model: streaming_job::Model,
114 pub definition: String,
115 pub database_resource_group: String,
116 pub upstream_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
118 pub target_upstream_epoch: u64,
120}
121
122#[derive(Debug)]
128enum BatchRefreshJobStatus {
129 ConsumingSnapshot {
134 prev_epoch_fake_physical_time: u64,
135 version_stats: HummockVersionStats,
136 create_mview_tracker: CreateMviewProgressTracker,
137 snapshot_epoch: u64,
138 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
139 pending_non_checkpoint_barriers: Vec<u64>,
140 node_actors: HashMap<WorkerId, HashSet<ActorId>>,
141 state_table_ids: HashSet<TableId>,
142 },
143 FinishingSnapshot {
150 tracking_job: Option<TrackingJob>,
151 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
152 },
153 Idle { last_committed_epoch: u64 },
155 InitializingBatchRefresh {
158 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
159 node_actors: HashMap<WorkerId, HashSet<ActorId>>,
160 state_table_ids: HashSet<TableId>,
161 pending_log_barriers: Vec<BarrierInfo>,
164 logstore_start_epoch: u64,
165 target_upstream_epoch: u64,
166 },
167 ConsumingLogStore {
173 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
174 logstore_start_epoch: u64,
176 target_upstream_epoch: u64,
178 },
179 Resetting { notifiers: Vec<Notifier> },
181}
182
183#[derive(Debug)]
194pub(crate) struct BatchRefreshJobCheckpointControl {
195 job_id: JobId,
196 partial_graph_id: PartialGraphId,
197 snapshot_backfill_upstream_tables: HashSet<TableId>,
198 snapshot_epoch: u64,
199 batch_refresh_seconds: u64,
201
202 status: BatchRefreshJobStatus,
203}
204
205impl BatchRefreshJobCheckpointControl {
208 pub(crate) fn render_actors_and_build_job_info(
219 fragments: &HashMap<FragmentId, LoadedFragment>,
220 downstreams: &FragmentDownstreamRelation,
221 definition: &str,
222 actor_id_generator: &AtomicU32,
224 worker_nodes: &HashMap<WorkerId, WorkerNode>,
225 database_resource_group: &str,
226 streaming_job_model: &streaming_job::Model,
227 partial_graph_id: PartialGraphId,
229 ) -> MetaResult<BatchRefreshRenderResult> {
230 let ensembles = Self::resolve_ensembles(fragments, downstreams)?;
232
233 let mut actor_assignments: HashMap<
235 FragmentId,
236 HashMap<ActorId, (WorkerId, Option<risingwave_common::bitmap::Bitmap>)>,
237 > = HashMap::new();
238
239 for ensemble in &ensembles {
240 let first_component = ensemble
242 .component_fragments()
243 .next()
244 .expect("ensemble must have at least one component");
245 let fragment = &fragments[&first_component];
246 let distribution_type = fragment.distribution_type;
247 let vnode_count = fragment.vnode_count;
248
249 for fid in ensemble.component_fragments() {
251 let f = &fragments[&fid];
252 assert_eq!(
253 vnode_count, f.vnode_count,
254 "fragments {} and {} in same ensemble have different vnode counts",
255 first_component, fid,
256 );
257 }
258
259 let entry_fragment_parallelism = Itertools::exactly_one(
260 ensemble
261 .entry_fragments()
262 .map(|fid| fragments[&fid].parallelism.clone())
263 .dedup(),
264 )
265 .map_err(|_| {
266 anyhow!(
267 "entry fragments have inconsistent parallelism settings in batch refresh job"
268 )
269 })?;
270
271 let actor_template = EnsembleActorTemplate::render_new(
272 streaming_job_model,
273 worker_nodes,
274 entry_fragment_parallelism,
275 database_resource_group.to_owned(),
276 distribution_type,
277 vnode_count,
278 )?;
279
280 for fid in ensemble.component_fragments() {
281 let f = &fragments[&fid];
282 let aligner =
283 ComponentFragmentAligner::new_persistent(&actor_template, actor_id_generator);
284 let assignments = aligner.align_component_actor(f.distribution_type);
285 actor_assignments.insert(fid, assignments);
286 }
287 }
288
289 let mut stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
291 let mut actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
292
293 for (fragment_id, assignments) in &actor_assignments {
294 let mut actors = Vec::with_capacity(assignments.len());
295 for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
296 actor_location.insert(actor_id, *worker_id);
297 let stream_context = streaming_job_model.stream_context();
298 actors.push(StreamActor {
299 actor_id,
300 fragment_id: *fragment_id,
301 vnode_bitmap: vnode_bitmap.clone(),
302 mview_definition: definition.to_owned(),
303 expr_context: Some(stream_context.to_expr_context()),
304 config_override: stream_context.config_override.clone(),
305 });
306 }
307 stream_actors.insert(*fragment_id, actors);
308 }
309
310 let fragment_infos: HashMap<FragmentId, InflightFragmentInfo> = fragments
312 .iter()
313 .map(|(fragment_id, loaded)| {
314 let actors = stream_actors
315 .get(fragment_id)
316 .into_iter()
317 .flatten()
318 .map(|actor| {
319 (
320 actor.actor_id,
321 crate::controller::fragment::InflightActorInfo {
322 worker_id: actor_location[&actor.actor_id],
323 vnode_bitmap: actor.vnode_bitmap.clone(),
324 splits: vec![], },
326 )
327 })
328 .collect();
329 (
330 *fragment_id,
331 InflightFragmentInfo {
332 fragment_id: *fragment_id,
333 distribution_type: loaded.distribution_type,
334 fragment_type_mask: loaded.fragment_type_mask,
335 vnode_count: loaded.vnode_count,
336 nodes: loaded.nodes.clone(),
337 actors,
338 state_table_ids: loaded.state_table_ids.clone(),
339 },
340 )
341 })
342 .collect();
343
344 let mut builder = FragmentEdgeBuilder::new(fragment_infos.values().map(|f| {
346 (
347 f.fragment_id,
348 EdgeBuilderFragmentInfo::from_inflight_with_worker_nodes(
349 f,
350 partial_graph_id,
351 worker_nodes,
352 ),
353 )
354 }));
355 builder.add_relations(downstreams);
356 let mut edges = builder.build();
357
358 let actors_to_create = edges.collect_actors_to_create(fragment_infos.values().map(|f| {
359 (
360 f.fragment_id,
361 &f.nodes,
362 f.actors.iter().map(|(actor_id, actor)| {
363 let sa = stream_actors[&f.fragment_id]
364 .iter()
365 .find(|a| a.actor_id == *actor_id)
366 .expect("should exist");
367 (sa, actor.worker_id)
368 }),
369 vec![], )
371 }));
372
373 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
375 let state_table_ids =
376 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
377
378 Ok(BatchRefreshRenderResult {
379 fragment_infos,
380 node_actors,
381 state_table_ids,
382 actors_to_create,
383 })
384 }
385
386 pub(crate) fn build_initial_partial_graph_mutation(
391 render_result: &BatchRefreshRenderResult,
392 backfill_ordering: &ExtendedFragmentBackfillOrder,
393 ) -> Mutation {
394 let added_actors: Vec<ActorId> = render_result
395 .fragment_infos
396 .values()
397 .flat_map(|f| f.actors.keys().copied())
398 .collect();
399 let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_ordering)
400 .into_iter()
401 .collect();
402 Mutation::Add(AddMutation {
403 actor_dispatchers: Default::default(),
404 added_actors,
405 actor_splits: Default::default(),
406 pause: false,
407 subscriptions_to_add: Default::default(),
408 backfill_nodes_to_pause,
409 actor_cdc_table_snapshot_splits: None,
410 new_upstream_sinks: Default::default(),
411 })
412 }
413
414 fn resolve_ensembles(
416 fragments: &HashMap<FragmentId, LoadedFragment>,
417 downstreams: &FragmentDownstreamRelation,
418 ) -> MetaResult<Vec<NoShuffleEnsemble>> {
419 let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
420 for (upstream_fid, relations) in downstreams {
421 for rel in relations {
422 if rel.dispatcher_type == DispatcherType::NoShuffle {
423 new_no_shuffle
424 .entry(*upstream_fid)
425 .or_default()
426 .insert(rel.downstream_fragment_id);
427 }
428 }
429 }
430
431 let mut ensembles = if new_no_shuffle.is_empty() {
432 Vec::new()
433 } else {
434 let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
435 .iter()
436 .flat_map(|(u, ds)| ds.iter().map(move |d| (*u, *d)))
437 .collect();
438 let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
439 .iter()
440 .flat_map(|(u, d)| [*u, *d])
441 .collect::<HashSet<_>>()
442 .into_iter()
443 .collect();
444 let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
445 find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
446 };
447
448 let covered: HashSet<FragmentId> = ensembles
450 .iter()
451 .flat_map(|e| e.component_fragments())
452 .collect();
453 for fragment_id in fragments.keys() {
454 if !covered.contains(fragment_id) {
455 ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
456 }
457 }
458
459 Ok(ensembles)
460 }
461}
462
463impl BatchRefreshJobCheckpointControl {
466 #[expect(clippy::too_many_arguments)]
471 pub(crate) fn new(
472 database_id: DatabaseId,
473 job_id: JobId,
474 create_info: CreateSnapshotBackfillJobCommandInfo,
475 notifiers: Vec<Notifier>,
476 snapshot_backfill_upstream_tables: HashSet<TableId>,
477 snapshot_epoch: u64,
478 version_stat: &HummockVersionStats,
479 partial_graph_manager: &mut PartialGraphManager,
480 logical: &BatchRefreshLogicalFragments,
481 worker_nodes: &HashMap<WorkerId, WorkerNode>,
482 batch_refresh_seconds: u64,
483 ) -> MetaResult<Self> {
484 debug!(
485 %job_id,
486 "new batch refresh job"
487 );
488
489 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
490 let backfill_ordering = &create_info.info.fragment_backfill_ordering;
491 let actor_id_generator = partial_graph_manager
492 .control_stream_manager()
493 .env
494 .actor_id_generator();
495
496 let render_result = Self::render_actors_and_build_job_info(
497 &logical.fragments,
498 &logical.downstreams,
499 &create_info.info.definition,
500 actor_id_generator,
501 worker_nodes,
502 &create_info.info.database_resource_group,
503 &create_info.info.streaming_job_model,
504 partial_graph_id,
505 )?;
506 let initial_partial_graph_mutation =
507 Self::build_initial_partial_graph_mutation(&render_result, backfill_ordering);
508
509 let backfill_order_state = BackfillOrderState::new(
510 backfill_ordering,
511 &render_result.fragment_infos,
512 create_info
513 .info
514 .locality_fragment_state_table_mapping
515 .clone(),
516 );
517 let create_mview_tracker = CreateMviewProgressTracker::recover(
518 job_id,
519 &render_result.fragment_infos,
520 backfill_order_state,
521 version_stat,
522 );
523
524 let mut prev_epoch_fake_physical_time = 0;
525 let mut pending_non_checkpoint_barriers = vec![];
526
527 let initial_barrier_info = super::new_fake_barrier(
528 &mut prev_epoch_fake_physical_time,
529 &mut pending_non_checkpoint_barriers,
530 PbBarrierKind::Checkpoint,
531 );
532
533 let mut graph_adder = partial_graph_manager.add_partial_graph(
534 partial_graph_id,
535 BatchRefreshBarrierStats::new(job_id, snapshot_epoch),
536 );
537
538 if let Err(e) = Self::inject_barrier(
539 partial_graph_id,
540 graph_adder.manager(),
541 &render_result.node_actors,
542 &render_result.state_table_ids,
543 initial_barrier_info,
544 Some(render_result.actors_to_create),
545 Some(initial_partial_graph_mutation),
546 notifiers,
547 Some(create_info),
548 false,
549 ) {
550 graph_adder.failed();
551 return Err(e);
552 }
553
554 graph_adder.added();
555 assert!(pending_non_checkpoint_barriers.is_empty());
556 let this = Self {
557 partial_graph_id,
558 job_id,
559 snapshot_backfill_upstream_tables,
560 snapshot_epoch,
561 batch_refresh_seconds,
562
563 status: BatchRefreshJobStatus::ConsumingSnapshot {
564 prev_epoch_fake_physical_time,
565 version_stats: version_stat.clone(),
566 create_mview_tracker,
567 snapshot_epoch,
568 fragment_infos: render_result.fragment_infos,
569 pending_non_checkpoint_barriers,
570 node_actors: render_result.node_actors,
571 state_table_ids: render_result.state_table_ids,
572 },
573 };
574 Ok(this)
575 }
576
577 #[expect(clippy::too_many_arguments)]
582 pub(crate) fn recover(
583 database_id: DatabaseId,
584 job_id: JobId,
585 snapshot_backfill_upstream_tables: HashSet<TableId>,
586 snapshot_epoch: u64,
587 committed_epoch: u64,
588 backfill_order: ExtendedFragmentBackfillOrder,
589 version_stat: &HummockVersionStats,
590 initial_mutation: Mutation,
591 render_result: BatchRefreshRenderResult,
592 partial_graph_recoverer: &mut crate::barrier::partial_graph::PartialGraphRecoverer<'_>,
593 batch_refresh_seconds: u64,
594 ) -> MetaResult<Self> {
595 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
596
597 if committed_epoch >= snapshot_epoch {
598 info!(
600 %job_id,
601 committed_epoch,
602 snapshot_epoch,
603 "recovered idle batch refresh job (no partial graph)"
604 );
605 return Ok(Self {
606 job_id,
607 partial_graph_id,
608 snapshot_backfill_upstream_tables,
609 snapshot_epoch,
610 batch_refresh_seconds,
611
612 status: BatchRefreshJobStatus::Idle {
613 last_committed_epoch: committed_epoch,
614 },
615 });
616 }
617
618 info!(
620 %job_id,
621 committed_epoch,
622 snapshot_epoch,
623 "recovered batch refresh job to consuming snapshot"
624 );
625
626 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
627 let mut pending_non_checkpoint_barriers = vec![];
628
629 let locality_fragment_state_table_mapping =
630 crate::barrier::rpc::build_locality_fragment_state_table_mapping(
631 &render_result.fragment_infos,
632 );
633 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
634 &backfill_order,
635 &render_result.fragment_infos,
636 locality_fragment_state_table_mapping,
637 );
638
639 let create_mview_tracker = CreateMviewProgressTracker::recover(
640 job_id,
641 &render_result.fragment_infos,
642 backfill_order_state,
643 version_stat,
644 );
645
646 let first_barrier_info = super::new_fake_barrier(
647 &mut prev_epoch_fake_physical_time,
648 &mut pending_non_checkpoint_barriers,
649 PbBarrierKind::Initial,
650 );
651
652 partial_graph_recoverer.recover_graph(
653 partial_graph_id,
654 initial_mutation,
655 &first_barrier_info,
656 &render_result.node_actors,
657 render_result.state_table_ids.iter().copied(),
658 render_result.actors_to_create,
659 BatchRefreshBarrierStats::new(job_id, snapshot_epoch),
660 )?;
661
662 Ok(Self {
663 job_id,
664 partial_graph_id,
665 snapshot_backfill_upstream_tables,
666 snapshot_epoch,
667 batch_refresh_seconds,
668 status: BatchRefreshJobStatus::ConsumingSnapshot {
669 prev_epoch_fake_physical_time,
670 version_stats: version_stat.clone(),
671 create_mview_tracker,
672 fragment_infos: render_result.fragment_infos,
673 snapshot_epoch,
674 pending_non_checkpoint_barriers,
675 node_actors: render_result.node_actors,
676 state_table_ids: render_result.state_table_ids,
677 },
678 })
679 }
680}
681
682impl BatchRefreshJobCheckpointControl {
685 fn inject_barrier(
686 partial_graph_id: PartialGraphId,
687 partial_graph_manager: &mut PartialGraphManager,
688 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
689 state_table_ids: &HashSet<TableId>,
690 barrier_info: BarrierInfo,
691 new_actors: Option<StreamJobActorsToCreate>,
692 mutation: Option<Mutation>,
693 notifiers: Vec<Notifier>,
694 first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
695 is_stop: bool,
696 ) -> MetaResult<()> {
697 if is_stop {
698 assert!(
699 matches!(&mutation, Some(Mutation::Stop(_))),
700 "stop barrier must carry a Stop mutation"
701 );
702 }
703 partial_graph_manager.inject_barrier(
704 partial_graph_id,
705 mutation,
706 node_actors,
707 state_table_ids.iter().copied(),
708 if is_stop {
709 itertools::Either::Left(std::iter::empty())
711 } else {
712 itertools::Either::Right(node_actors.keys().copied())
713 },
714 new_actors,
715 PartialGraphBarrierInfo::new(
716 first_create_info.map_or_else(
717 PostCollectCommand::barrier,
718 CreateSnapshotBackfillJobCommandInfo::into_post_collect,
719 ),
720 barrier_info,
721 notifiers,
722 state_table_ids.clone(),
723 ),
724 )?;
725 Ok(())
726 }
727}
728
729impl BatchRefreshJobCheckpointControl {
732 pub(crate) fn on_new_upstream_barrier(
733 &mut self,
734 partial_graph_manager: &mut PartialGraphManager,
735 barrier_info: &BarrierInfo,
736 mutation: Option<(Mutation, Vec<Notifier>)>,
737 ) -> MetaResult<()> {
738 if !matches!(self.status, BatchRefreshJobStatus::ConsumingSnapshot { .. }) {
739 return Ok(());
742 }
743 let (mut mutation, mut notifiers) = match mutation {
744 Some((mutation, notifiers)) => (Some(mutation), notifiers),
745 None => (None, vec![]),
746 };
747
748 let is_finished = matches!(
750 &self.status,
751 BatchRefreshJobStatus::ConsumingSnapshot { create_mview_tracker, .. }
752 if create_mview_tracker.is_finished()
753 );
754
755 if is_finished {
756 mutation.take();
758
759 let old_status = replace(
762 &mut self.status,
763 BatchRefreshJobStatus::Idle {
764 last_committed_epoch: 0,
765 },
766 );
767 let BatchRefreshJobStatus::ConsumingSnapshot {
768 prev_epoch_fake_physical_time,
769 mut pending_non_checkpoint_barriers,
770 snapshot_epoch,
771 fragment_infos,
772 create_mview_tracker,
773 node_actors,
774 state_table_ids,
775 ..
776 } = old_status
777 else {
778 unreachable!()
779 };
780
781 let tracking_job = create_mview_tracker.into_tracking_job();
782
783 pending_non_checkpoint_barriers.push(snapshot_epoch);
785 let prev_epoch = Epoch::from_physical_time(prev_epoch_fake_physical_time);
786 let final_checkpoint = BarrierInfo {
787 curr_epoch: TracedEpoch::new(Epoch(snapshot_epoch)),
788 prev_epoch: TracedEpoch::new(prev_epoch),
789 kind: BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers)),
790 };
791
792 let stop_barrier = BarrierInfo {
794 prev_epoch: TracedEpoch::new(Epoch(snapshot_epoch)),
795 curr_epoch: TracedEpoch::new(Epoch(u64::MAX)),
796 kind: BarrierKind::Checkpoint(vec![snapshot_epoch]),
797 };
798
799 let stop_actors: Vec<ActorId> = fragment_infos
800 .values()
801 .flat_map(|f| f.actors.keys().copied())
802 .collect();
803
804 Self::inject_barrier(
805 self.partial_graph_id,
806 partial_graph_manager,
807 &node_actors,
808 &state_table_ids,
809 final_checkpoint,
810 None,
811 None,
812 take(&mut notifiers),
813 None,
814 false,
815 )?;
816 Self::inject_barrier(
817 self.partial_graph_id,
818 partial_graph_manager,
819 &node_actors,
820 &state_table_ids,
821 stop_barrier,
822 None,
823 Some(Mutation::Stop(StopMutation {
824 actors: stop_actors,
825 dropped_sink_fragments: vec![],
826 })),
827 vec![],
828 None,
829 true,
830 )?;
831
832 self.status = BatchRefreshJobStatus::FinishingSnapshot {
833 tracking_job: Some(tracking_job),
834 fragment_infos,
835 };
836 } else {
837 let BatchRefreshJobStatus::ConsumingSnapshot {
839 prev_epoch_fake_physical_time,
840 pending_non_checkpoint_barriers,
841 create_mview_tracker,
842 node_actors,
843 state_table_ids,
844 ..
845 } = &mut self.status
846 else {
847 unreachable!("is_finished was false, status must be ConsumingSnapshot")
848 };
849
850 let mutation = mutation.take().or_else(|| {
852 let pending_backfill_nodes = create_mview_tracker
853 .take_pending_backfill_nodes()
854 .collect_vec();
855 if pending_backfill_nodes.is_empty() {
856 None
857 } else {
858 Some(Mutation::StartFragmentBackfill(
859 StartFragmentBackfillMutation {
860 fragment_ids: pending_backfill_nodes,
861 },
862 ))
863 }
864 });
865 let barrier_to_inject = super::new_fake_barrier(
866 prev_epoch_fake_physical_time,
867 pending_non_checkpoint_barriers,
868 match barrier_info.kind {
869 BarrierKind::Barrier => PbBarrierKind::Barrier,
870 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
871 BarrierKind::Initial => {
872 unreachable!("upstream new epoch should not be initial")
873 }
874 },
875 );
876 Self::inject_barrier(
877 self.partial_graph_id,
878 partial_graph_manager,
879 node_actors,
880 state_table_ids,
881 barrier_to_inject,
882 None,
883 mutation,
884 take(&mut notifiers),
885 None,
886 false,
887 )?;
888 }
889 assert!(mutation.is_none(), "must have consumed mutation");
890 assert!(notifiers.is_empty(), "must consumed notifiers");
891 Ok(())
892 }
893
894 pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
895 match &mut self.status {
896 BatchRefreshJobStatus::ConsumingSnapshot {
897 create_mview_tracker,
898 version_stats,
899 ..
900 } => {
901 for progress in collected_barrier
902 .resps
903 .values()
904 .flat_map(|resp| &resp.create_mview_progress)
905 {
906 create_mview_tracker.apply_progress(progress, version_stats);
907 }
908 create_mview_tracker.is_finished()
909 }
910 BatchRefreshJobStatus::InitializingBatchRefresh { .. }
911 | BatchRefreshJobStatus::ConsumingLogStore { .. } => {
912 false
914 }
915 _ => false,
916 }
917 }
918}
919
920impl BatchRefreshJobCheckpointControl {
923 #[expect(clippy::type_complexity)]
924 pub(crate) fn start_completing(
925 &mut self,
926 partial_graph_manager: &mut PartialGraphManager,
927 ) -> Option<(
928 u64,
929 HashMap<WorkerId, BarrierCompleteResponse>,
930 PartialGraphBarrierInfo,
931 Option<TrackingJob>,
932 )> {
933 match &self.status {
934 BatchRefreshJobStatus::ConsumingSnapshot { .. }
935 | BatchRefreshJobStatus::FinishingSnapshot { .. }
936 | BatchRefreshJobStatus::ConsumingLogStore { .. } => {}
937 BatchRefreshJobStatus::Idle { .. }
938 | BatchRefreshJobStatus::InitializingBatchRefresh { .. }
939 | BatchRefreshJobStatus::Resetting { .. } => {
940 return None;
941 }
942 };
943
944 partial_graph_manager
945 .start_completing(
946 self.partial_graph_id,
947 std::ops::Bound::Unbounded,
948 |_non_checkpoint_epoch, _resps, _| {
949 },
951 )
952 .map(|(epoch, resps, info)| {
953 let tracking_job = match &mut self.status {
959 BatchRefreshJobStatus::FinishingSnapshot { tracking_job, .. }
960 if epoch == self.snapshot_epoch =>
961 {
962 Some(
963 tracking_job
964 .take()
965 .expect("tracking job should not have been taken yet"),
966 )
967 }
968 _ => None,
969 };
970 (epoch, resps, info, tracking_job)
971 })
972 }
973
974 pub(super) fn ack_completed(
975 &mut self,
976 partial_graph_manager: &mut PartialGraphManager,
977 completed_epoch: u64,
978 ) {
979 match &self.status {
980 BatchRefreshJobStatus::ConsumingSnapshot { .. } => {
981 partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
982 }
983 BatchRefreshJobStatus::FinishingSnapshot { tracking_job, .. }
984 if completed_epoch == self.snapshot_epoch =>
985 {
986 partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
987 assert!(
988 tracking_job.is_none(),
989 "tracking job should have been taken at start_completing"
990 );
991 info!(
992 job_id = %self.job_id,
993 completed_epoch,
994 "batch refresh job: snapshot done, transitioned to idle, removing partial graph"
995 );
996 partial_graph_manager.remove_partial_graphs(vec![self.partial_graph_id]);
997 self.status = BatchRefreshJobStatus::Idle {
998 last_committed_epoch: completed_epoch,
999 };
1000 }
1001 BatchRefreshJobStatus::FinishingSnapshot { .. } => {
1002 partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
1003 }
1004 BatchRefreshJobStatus::ConsumingLogStore {
1005 target_upstream_epoch,
1006 ..
1007 } if completed_epoch == *target_upstream_epoch => {
1008 let target = *target_upstream_epoch;
1009 partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
1010 info!(
1011 job_id = %self.job_id,
1012 completed_epoch,
1013 target_upstream_epoch = target,
1014 "batch refresh job: logstore done, transitioned to idle, removing partial graph"
1015 );
1016 partial_graph_manager.remove_partial_graphs(vec![self.partial_graph_id]);
1017 self.status = BatchRefreshJobStatus::Idle {
1018 last_committed_epoch: target,
1019 };
1020 }
1021 BatchRefreshJobStatus::ConsumingLogStore { .. } => {
1022 partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
1023 }
1024 BatchRefreshJobStatus::Resetting { .. } => {
1025 }
1028 BatchRefreshJobStatus::Idle { .. }
1029 | BatchRefreshJobStatus::InitializingBatchRefresh { .. } => {
1030 unreachable!("batch refresh job should not be completing in this state")
1031 }
1032 }
1033 }
1034
1035 pub(super) fn on_partial_graph_reset(mut self) {
1037 match &mut self.status {
1038 BatchRefreshJobStatus::Resetting { notifiers } => {
1039 for notifier in notifiers.drain(..) {
1040 notifier.notify_collected();
1041 }
1042 }
1043 _ => {
1044 panic!(
1045 "batch refresh job {}: on_partial_graph_reset in unexpected state {:?}",
1046 self.job_id, self.status
1047 );
1048 }
1049 }
1050 }
1051}
1052
1053impl BatchRefreshJobCheckpointControl {
1056 pub(crate) fn gen_backfill_progress(&self) -> Option<BackfillProgress> {
1057 match &self.status {
1058 BatchRefreshJobStatus::ConsumingSnapshot {
1059 create_mview_tracker,
1060 ..
1061 } => {
1062 let progress = if create_mview_tracker.is_finished() {
1063 "Snapshot finished".to_owned()
1064 } else {
1065 let progress = create_mview_tracker.gen_backfill_progress();
1066 format!("BatchRefresh Snapshot [{}]", progress)
1067 };
1068 Some(BackfillProgress {
1069 progress,
1070 backfill_type: PbBackfillType::SnapshotBackfill,
1071 })
1072 }
1073 BatchRefreshJobStatus::FinishingSnapshot { .. } => Some(BackfillProgress {
1074 progress: "BatchRefresh Stopping".to_owned(),
1075 backfill_type: PbBackfillType::SnapshotBackfill,
1076 }),
1077 BatchRefreshJobStatus::InitializingBatchRefresh { .. }
1078 | BatchRefreshJobStatus::ConsumingLogStore { .. } => Some(BackfillProgress {
1079 progress: "BatchRefresh LogStore".to_owned(),
1080 backfill_type: PbBackfillType::SnapshotBackfill,
1081 }),
1082 BatchRefreshJobStatus::Idle { .. } | BatchRefreshJobStatus::Resetting { .. } => None,
1083 }
1084 }
1085
1086 pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
1087 match &self.status {
1088 BatchRefreshJobStatus::ConsumingSnapshot {
1089 create_mview_tracker,
1090 fragment_infos,
1091 ..
1092 } => create_mview_tracker.collect_fragment_progress(fragment_infos, true),
1093 BatchRefreshJobStatus::FinishingSnapshot { fragment_infos, .. } => {
1094 collect_done_fragments(self.job_id, fragment_infos)
1095 }
1096 _ => vec![],
1097 }
1098 }
1099
1100 pub(super) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
1102 match &self.status {
1103 BatchRefreshJobStatus::ConsumingSnapshot { .. }
1104 | BatchRefreshJobStatus::FinishingSnapshot { .. } => (
1105 self.snapshot_epoch,
1106 self.snapshot_backfill_upstream_tables.clone(),
1107 ),
1108 BatchRefreshJobStatus::ConsumingLogStore {
1109 logstore_start_epoch,
1110 ..
1111 }
1112 | BatchRefreshJobStatus::InitializingBatchRefresh {
1113 logstore_start_epoch,
1114 ..
1115 } => (
1116 *logstore_start_epoch,
1117 self.snapshot_backfill_upstream_tables.clone(),
1118 ),
1119 BatchRefreshJobStatus::Idle {
1120 last_committed_epoch,
1121 } => (
1122 *last_committed_epoch,
1123 self.snapshot_backfill_upstream_tables.clone(),
1124 ),
1125 BatchRefreshJobStatus::Resetting { .. } => (0, HashSet::new()),
1126 }
1127 }
1128
1129 pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
1130 match &self.status {
1131 BatchRefreshJobStatus::ConsumingSnapshot { fragment_infos, .. } => Some(fragment_infos),
1132 BatchRefreshJobStatus::InitializingBatchRefresh { fragment_infos, .. } => {
1133 Some(fragment_infos)
1134 }
1135 BatchRefreshJobStatus::ConsumingLogStore { fragment_infos, .. } => Some(fragment_infos),
1136 BatchRefreshJobStatus::FinishingSnapshot { .. }
1137 | BatchRefreshJobStatus::Idle { .. }
1138 | BatchRefreshJobStatus::Resetting { .. } => None,
1139 }
1140 }
1141
1142 pub(crate) fn is_snapshot_backfilling(&self) -> bool {
1143 matches!(
1144 self.status,
1145 BatchRefreshJobStatus::ConsumingSnapshot { .. }
1146 | BatchRefreshJobStatus::FinishingSnapshot { .. }
1147 | BatchRefreshJobStatus::InitializingBatchRefresh { .. }
1148 | BatchRefreshJobStatus::ConsumingLogStore { .. }
1149 )
1150 }
1151
1152 pub(crate) fn should_start_refresh(&self, upstream_committed_epoch: u64) -> bool {
1157 if let BatchRefreshJobStatus::Idle {
1158 last_committed_epoch,
1159 } = &self.status
1160 {
1161 let job_physical_ms = Epoch(*last_committed_epoch).physical_time();
1162 let upstream_physical_ms = Epoch(upstream_committed_epoch).physical_time();
1163 let threshold_ms = self.batch_refresh_seconds * 1000;
1164 upstream_physical_ms.saturating_sub(job_physical_ms) >= threshold_ms
1165 } else {
1166 false
1167 }
1168 }
1169
1170 pub(crate) fn last_committed_epoch(&self) -> Option<u64> {
1172 if let BatchRefreshJobStatus::Idle {
1173 last_committed_epoch,
1174 } = &self.status
1175 {
1176 Some(*last_committed_epoch)
1177 } else {
1178 None
1179 }
1180 }
1181}
1182
1183impl BatchRefreshJobCheckpointControl {
1186 pub(crate) fn start_refresh_run(
1198 &mut self,
1199 context: &BatchRefreshJobTriggerContext,
1200 worker_nodes: &HashMap<WorkerId, WorkerNode>,
1201 actor_id_counter: &AtomicU32,
1202 partial_graph_manager: &mut PartialGraphManager,
1203 ) -> MetaResult<bool> {
1204 let last_committed_epoch = match &self.status {
1205 BatchRefreshJobStatus::Idle {
1206 last_committed_epoch,
1207 } => *last_committed_epoch,
1208 _ => panic!(
1209 "batch refresh job {}: start_refresh_run called in non-Idle state {:?}",
1210 self.job_id, self.status
1211 ),
1212 };
1213
1214 let target_upstream_epoch = context.target_upstream_epoch;
1216 let Some((first_epoch, pending_log_barriers)) = Self::resolve_log_epoch_barriers(
1217 &self.snapshot_backfill_upstream_tables,
1218 &context.upstream_table_log_epochs,
1219 last_committed_epoch,
1220 )?
1221 else {
1222 info!(
1223 job_id = %self.job_id,
1224 last_committed_epoch,
1225 target_upstream_epoch,
1226 "batch refresh job: no log epochs to consume, staying idle"
1227 );
1228 return Ok(false);
1229 };
1230
1231 let log_target_epoch = pending_log_barriers.last().expect("non-empty").prev_epoch();
1232 if target_upstream_epoch != log_target_epoch {
1233 info!(
1234 job_id = %self.job_id,
1235 last_committed_epoch,
1236 target_upstream_epoch,
1237 log_target_epoch,
1238 "batch refresh job: upstream target has no resolved changelog yet, staying idle"
1239 );
1240 return Ok(false);
1241 }
1242
1243 let logical = BatchRefreshLogicalFragments::from_context(context);
1245
1246 let render_result = Self::render_actors_and_build_job_info(
1248 &logical.fragments,
1249 &logical.downstreams,
1250 &context.definition,
1251 actor_id_counter,
1252 worker_nodes,
1253 &context.database_resource_group,
1254 &context.streaming_job_model,
1255 self.partial_graph_id,
1256 )?;
1257
1258 let added_actors: Vec<ActorId> = render_result
1260 .fragment_infos
1261 .values()
1262 .flat_map(|fragment| fragment.actors.keys().copied())
1263 .collect();
1264
1265 let initial_mutation = Mutation::Add(AddMutation {
1266 actor_dispatchers: Default::default(),
1267 added_actors,
1268 actor_splits: Default::default(),
1269 pause: false,
1270 subscriptions_to_add: Default::default(),
1271 backfill_nodes_to_pause: Default::default(),
1272 actor_cdc_table_snapshot_splits: None,
1273 new_upstream_sinks: Default::default(),
1274 });
1275
1276 let node_actors = &render_result.node_actors;
1277 let state_table_ids = &render_result.state_table_ids;
1278 let initial_barrier = BarrierInfo {
1279 prev_epoch: TracedEpoch::new(Epoch(last_committed_epoch)),
1280 curr_epoch: TracedEpoch::new(Epoch(first_epoch)),
1281 kind: BarrierKind::Initial,
1282 };
1283 let mut partial_graph_recoverer = partial_graph_manager.start_recover();
1284 let recover_result = partial_graph_recoverer.recover_graph(
1285 self.partial_graph_id,
1286 initial_mutation,
1287 &initial_barrier,
1288 node_actors,
1289 state_table_ids.iter().copied(),
1290 render_result.actors_to_create,
1291 BatchRefreshBarrierStats::new(self.job_id, self.snapshot_epoch),
1292 );
1293 match recover_result {
1294 Ok(()) => {
1295 let initializing_partial_graphs = partial_graph_recoverer.all_initializing();
1296 debug_assert_eq!(initializing_partial_graphs.len(), 1);
1297 debug_assert!(initializing_partial_graphs.contains(&self.partial_graph_id));
1298 }
1299 Err(e) => {
1300 partial_graph_recoverer.failed();
1301 return Err(e);
1302 }
1303 }
1304
1305 let logstore_start_epoch = last_committed_epoch;
1306
1307 info!(
1308 job_id = %self.job_id,
1309 last_committed_epoch,
1310 target_upstream_epoch,
1311 num_log_barriers = pending_log_barriers.len(),
1312 "batch refresh job: initialized logstore consumption partial graph"
1313 );
1314
1315 self.status = BatchRefreshJobStatus::InitializingBatchRefresh {
1316 fragment_infos: render_result.fragment_infos,
1317 node_actors: render_result.node_actors,
1318 state_table_ids: render_result.state_table_ids,
1319 pending_log_barriers,
1320 logstore_start_epoch,
1321 target_upstream_epoch,
1322 };
1323
1324 Ok(true)
1325 }
1326
1327 pub(crate) fn on_log_store_initialized(
1328 &mut self,
1329 partial_graph_manager: &mut PartialGraphManager,
1330 ) -> MetaResult<()> {
1331 let old_status = replace(
1332 &mut self.status,
1333 BatchRefreshJobStatus::Idle {
1334 last_committed_epoch: 0,
1335 },
1336 );
1337 let BatchRefreshJobStatus::InitializingBatchRefresh {
1338 fragment_infos,
1339 node_actors,
1340 state_table_ids,
1341 pending_log_barriers,
1342 logstore_start_epoch,
1343 target_upstream_epoch,
1344 } = old_status
1345 else {
1346 panic!(
1347 "batch refresh job {}: logstore initialized in unexpected status {:?}",
1348 self.job_id, old_status
1349 );
1350 };
1351
1352 let final_barrier_idx = pending_log_barriers.len() - 1;
1353 let mut stop_mutation = Some(Mutation::Stop(StopMutation {
1354 actors: fragment_infos
1355 .values()
1356 .flat_map(|fragment| fragment.actors.keys().copied())
1357 .collect(),
1358 dropped_sink_fragments: vec![],
1359 }));
1360 for (idx, barrier) in pending_log_barriers.into_iter().enumerate() {
1361 let is_stop_barrier = idx == final_barrier_idx;
1362 let mutation = is_stop_barrier.then(|| stop_mutation.take().expect("unused"));
1363 Self::inject_barrier(
1364 self.partial_graph_id,
1365 partial_graph_manager,
1366 &node_actors,
1367 &state_table_ids,
1368 barrier,
1369 None,
1370 mutation,
1371 vec![],
1372 None,
1373 is_stop_barrier,
1374 )?;
1375 }
1376
1377 self.status = BatchRefreshJobStatus::ConsumingLogStore {
1378 fragment_infos,
1379 logstore_start_epoch,
1380 target_upstream_epoch,
1381 };
1382 Ok(())
1383 }
1384
1385 fn resolve_log_epoch_barriers(
1391 snapshot_backfill_upstream_tables: &HashSet<TableId>,
1392 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
1393 exclusive_start_log_epoch: u64,
1394 ) -> MetaResult<Option<(u64, Vec<BarrierInfo>)>> {
1395 let table_id = snapshot_backfill_upstream_tables
1396 .iter()
1397 .next()
1398 .expect("snapshot backfill job should have upstream");
1399 let Some(epochs) = upstream_table_log_epochs.get(table_id) else {
1400 return Ok(None);
1401 };
1402
1403 let mut epochs_iter = epochs.iter().peekable();
1405 loop {
1406 match epochs_iter.peek() {
1407 Some((_, checkpoint_epoch)) if *checkpoint_epoch <= exclusive_start_log_epoch => {
1408 epochs_iter.next();
1409 }
1410 _ => break,
1411 }
1412 }
1413
1414 let mut epoch_infos = vec![];
1415 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
1416 epoch_infos.extend(
1417 non_checkpoint_epochs
1418 .iter()
1419 .copied()
1420 .map(|epoch| (epoch, false)),
1421 );
1422 epoch_infos.push((*checkpoint_epoch, true));
1423 }
1424 if epoch_infos.is_empty() {
1425 return Ok(None);
1426 }
1427
1428 let first_epoch = epoch_infos[0].0;
1429 let mut pending_non_checkpoint_epochs = vec![];
1430 let mut replay_barriers = vec![];
1431 for window in epoch_infos.windows(2) {
1432 let (prev_epoch, is_checkpoint) = window[0];
1433 let curr_epoch = window[1].0;
1434 assert!(prev_epoch > exclusive_start_log_epoch);
1435 assert!(curr_epoch > prev_epoch);
1436 pending_non_checkpoint_epochs.push(prev_epoch);
1437 let kind = if is_checkpoint {
1438 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_epochs))
1439 } else {
1440 BarrierKind::Barrier
1441 };
1442 replay_barriers.push(BarrierInfo {
1443 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
1444 curr_epoch: TracedEpoch::new(Epoch(curr_epoch)),
1445 kind,
1446 });
1447 }
1448
1449 let (last_epoch, _) = *epoch_infos.last().expect("non-empty");
1450 assert!(last_epoch > exclusive_start_log_epoch);
1451 pending_non_checkpoint_epochs.push(last_epoch);
1452 replay_barriers.push(BarrierInfo {
1453 prev_epoch: TracedEpoch::new(Epoch(last_epoch)),
1454 curr_epoch: TracedEpoch::new(Epoch(u64::MAX)),
1455 kind: BarrierKind::Checkpoint(pending_non_checkpoint_epochs),
1456 });
1457
1458 Ok(Some((first_epoch, replay_barriers)))
1459 }
1460}
1461
1462impl BatchRefreshLogicalFragments {
1463 pub(crate) fn from_context(ctx: &BatchRefreshJobTriggerContext) -> Self {
1465 Self {
1466 fragments: ctx.fragments.clone(),
1467 downstreams: ctx.downstreams.clone(),
1468 }
1469 }
1470}
1471
1472impl BatchRefreshJobCheckpointControl {
1475 pub(super) fn drop(
1477 &mut self,
1478 notifiers: &mut Vec<Notifier>,
1479 partial_graph_manager: &mut PartialGraphManager,
1480 ) -> bool {
1481 match &mut self.status {
1482 BatchRefreshJobStatus::Resetting {
1483 notifiers: existing_notifiers,
1484 ..
1485 } => {
1486 for notifier in &mut *notifiers {
1487 notifier.notify_started();
1488 }
1489 existing_notifiers.append(notifiers);
1490 true
1491 }
1492 BatchRefreshJobStatus::ConsumingSnapshot { .. }
1493 | BatchRefreshJobStatus::FinishingSnapshot { .. }
1494 | BatchRefreshJobStatus::InitializingBatchRefresh { .. }
1495 | BatchRefreshJobStatus::ConsumingLogStore { .. } => {
1496 for notifier in &mut *notifiers {
1497 notifier.notify_started();
1498 }
1499 partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
1500 self.status = BatchRefreshJobStatus::Resetting {
1501 notifiers: take(notifiers),
1502 };
1503 true
1504 }
1505 BatchRefreshJobStatus::Idle { .. } => {
1506 for notifier in &mut *notifiers {
1509 notifier.notify_started();
1510 }
1511 partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
1512 self.status = BatchRefreshJobStatus::Resetting {
1513 notifiers: take(notifiers),
1514 };
1515 true
1516 }
1517 }
1518 }
1519
1520 pub(crate) fn reset(self) -> bool {
1525 match self.status {
1526 BatchRefreshJobStatus::ConsumingSnapshot { .. }
1527 | BatchRefreshJobStatus::FinishingSnapshot { .. }
1528 | BatchRefreshJobStatus::InitializingBatchRefresh { .. }
1529 | BatchRefreshJobStatus::ConsumingLogStore { .. }
1530 | BatchRefreshJobStatus::Idle { .. } => false,
1531 BatchRefreshJobStatus::Resetting { notifiers, .. } => {
1532 for notifier in notifiers {
1533 notifier.notify_collected();
1534 }
1535 true
1536 }
1537 }
1538 }
1539}
1540
1541struct BatchRefreshBarrierStats {
1544 barrier_latency: LabelGuardedHistogram,
1545 inflight_barrier_num: LabelGuardedIntGauge,
1546}
1547
1548impl BatchRefreshBarrierStats {
1549 fn new(job_id: JobId, _snapshot_epoch: u64) -> Self {
1550 let table_id_str = format!("{}", job_id);
1551 Self {
1552 barrier_latency: GLOBAL_META_METRICS
1553 .snapshot_backfill_barrier_latency
1554 .with_guarded_label_values(&[table_id_str.as_str(), "batch_refresh_snapshot"]),
1555 inflight_barrier_num: GLOBAL_META_METRICS
1556 .snapshot_backfill_inflight_barrier_num
1557 .with_guarded_label_values(&[&table_id_str]),
1558 }
1559 }
1560}
1561
1562impl PartialGraphStat for BatchRefreshBarrierStats {
1563 fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
1564 self.barrier_latency.observe(barrier_latency_secs);
1565 }
1566
1567 fn observe_barrier_num(&self, inflight_barrier_num: usize, _collected_barrier_num: usize) {
1568 self.inflight_barrier_num.set(inflight_barrier_num as _);
1569 }
1570}