risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet, hash_map};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_meta_model::{DispatcherType, WorkerId};
30use risingwave_pb::ddl_service::PbBackfillType;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
33use risingwave_pb::stream_plan::barrier::PbBarrierKind;
34use risingwave_pb::stream_plan::barrier_mutation::Mutation;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_plan::{AddMutation, StopMutation};
37use risingwave_pb::stream_service::BarrierCompleteResponse;
38use status::CreatingStreamingJobStatus;
39use tracing::{debug, info};
40
41use crate::MetaResult;
42use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
43use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
44use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
45use crate::barrier::edge_builder::FragmentEdgeBuildResult;
46use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
47use crate::barrier::notifier::Notifier;
48use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphRecoverer};
49use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
50use crate::barrier::rpc::{build_locality_fragment_state_table_mapping, to_partial_graph_id};
51use crate::barrier::{
52    BackfillOrderState, BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch,
53};
54use crate::controller::fragment::InflightFragmentInfo;
55use crate::model::{FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::source_manager::SplitAssignment;
58use crate::stream::{ExtendedFragmentBackfillOrder, build_actor_connector_splits};
59
60#[derive(Debug)]
61pub(crate) struct CreatingJobInfo {
62    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
63    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
64    pub downstreams: FragmentDownstreamRelation,
65    pub snapshot_backfill_upstream_tables: HashSet<TableId>,
66    pub stream_actors: HashMap<ActorId, StreamActor>,
67}
68
69#[derive(Debug)]
70pub(crate) struct CreatingStreamingJobControl {
71    job_id: JobId,
72    partial_graph_id: PartialGraphId,
73    snapshot_backfill_upstream_tables: HashSet<TableId>,
74    snapshot_epoch: u64,
75
76    node_actors: HashMap<WorkerId, HashSet<ActorId>>,
77    state_table_ids: HashSet<TableId>,
78
79    barrier_control: CreatingStreamingJobBarrierControl,
80    status: CreatingStreamingJobStatus,
81
82    upstream_lag: LabelGuardedIntGauge,
83}
84
85fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
86    let mut has_unreschedulable_scan = false;
87    visit_stream_node_cont(&fragment.nodes, |node| {
88        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
89            let scan_type = stream_scan.stream_scan_type();
90            if !scan_type.is_reschedulable(true) {
91                has_unreschedulable_scan = true;
92                return false;
93            }
94        }
95        true
96    });
97    has_unreschedulable_scan
98}
99
100fn collect_fragment_upstream_fragment_ids(
101    fragment: &InflightFragmentInfo,
102    upstream_fragment_ids: &mut HashSet<FragmentId>,
103) {
104    visit_stream_node_cont(&fragment.nodes, |node| {
105        if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
106            upstream_fragment_ids.insert(merge.upstream_fragment_id);
107        }
108        true
109    });
110}
111
112impl CreatingStreamingJobControl {
113    pub(super) fn new<'a>(
114        entry: hash_map::VacantEntry<'a, JobId, Self>,
115        create_info: CreateSnapshotBackfillJobCommandInfo,
116        notifiers: Vec<Notifier>,
117        snapshot_backfill_upstream_tables: HashSet<TableId>,
118        snapshot_epoch: u64,
119        version_stat: &HummockVersionStats,
120        partial_graph_manager: &mut PartialGraphManager,
121        edges: &mut FragmentEdgeBuildResult,
122        split_assignment: &SplitAssignment,
123    ) -> MetaResult<&'a mut Self> {
124        let info = create_info.info.clone();
125        let job_id = info.stream_job_fragments.stream_job_id();
126        let database_id = info.streaming_job.database_id();
127        debug!(
128            %job_id,
129            definition = info.definition,
130            "new creating job"
131        );
132        let fragment_infos = info
133            .stream_job_fragments
134            .new_fragment_info(split_assignment)
135            .collect();
136        let snapshot_backfill_actors =
137            InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
138        let backfill_nodes_to_pause =
139            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
140                .into_iter()
141                .collect();
142        let backfill_order_state = BackfillOrderState::new(
143            &info.fragment_backfill_ordering,
144            &info.stream_job_fragments,
145            info.locality_fragment_state_table_mapping.clone(),
146        );
147        let create_mview_tracker = CreateMviewProgressTracker::recover(
148            job_id,
149            &fragment_infos,
150            backfill_order_state,
151            version_stat,
152        );
153
154        let actors_to_create =
155            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create().map(
156                |(fragment_id, node, actors)| {
157                    (
158                        fragment_id,
159                        node,
160                        actors,
161                        [], // no subscribers for newly creating job
162                    )
163                },
164            ));
165
166        let barrier_control = CreatingStreamingJobBarrierControl::new(job_id, snapshot_epoch, None);
167
168        let mut prev_epoch_fake_physical_time = 0;
169        let mut pending_non_checkpoint_barriers = vec![];
170
171        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
172            &mut prev_epoch_fake_physical_time,
173            &mut pending_non_checkpoint_barriers,
174            PbBarrierKind::Checkpoint,
175        );
176
177        let added_actors = info.stream_job_fragments.actor_ids().collect();
178        let actor_splits = split_assignment
179            .values()
180            .flat_map(build_actor_connector_splits)
181            .collect();
182
183        assert!(
184            info.cdc_table_snapshot_splits.is_none(),
185            "should not have cdc backfill for snapshot backfill job"
186        );
187
188        let initial_mutation = Mutation::Add(AddMutation {
189            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
190            actor_dispatchers: Default::default(),
191            added_actors,
192            actor_splits,
193            // we assume that when handling snapshot backfill, the cluster must not be paused
194            pause: false,
195            subscriptions_to_add: Default::default(),
196            backfill_nodes_to_pause,
197            actor_cdc_table_snapshot_splits: None,
198            new_upstream_sinks: Default::default(),
199        });
200
201        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
202        let state_table_ids =
203            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
204
205        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
206
207        let job = entry.insert(Self {
208            partial_graph_id,
209            job_id,
210            snapshot_backfill_upstream_tables,
211            barrier_control,
212            snapshot_epoch,
213            status: CreatingStreamingJobStatus::PlaceHolder, // filled in later code
214            upstream_lag: GLOBAL_META_METRICS
215                .snapshot_backfill_lag
216                .with_guarded_label_values(&[&format!("{}", job_id)]),
217            node_actors,
218            state_table_ids,
219        });
220
221        let mut graph_adder = partial_graph_manager.add_partial_graph(partial_graph_id);
222
223        if let Err(e) = Self::inject_barrier(
224            partial_graph_id,
225            graph_adder.manager(),
226            &mut job.barrier_control,
227            &job.node_actors,
228            Some(&job.state_table_ids),
229            initial_barrier_info,
230            Some(actors_to_create),
231            Some(initial_mutation),
232            notifiers,
233            Some(create_info),
234        ) {
235            graph_adder.failed();
236            job.status = CreatingStreamingJobStatus::Resetting(vec![]);
237            Err(e)
238        } else {
239            graph_adder.added();
240            assert!(pending_non_checkpoint_barriers.is_empty());
241            let job_info = CreatingJobInfo {
242                fragment_infos,
243                upstream_fragment_downstreams: info.upstream_fragment_downstreams.clone(),
244                downstreams: info.stream_job_fragments.downstreams.clone(),
245                snapshot_backfill_upstream_tables: job.snapshot_backfill_upstream_tables.clone(),
246                stream_actors: info
247                    .stream_job_fragments
248                    .fragments
249                    .values()
250                    .flat_map(|fragment| {
251                        fragment
252                            .actors
253                            .iter()
254                            .map(|actor| (actor.actor_id, actor.clone()))
255                    })
256                    .collect(),
257            };
258            job.status = CreatingStreamingJobStatus::ConsumingSnapshot {
259                prev_epoch_fake_physical_time,
260                pending_upstream_barriers: vec![],
261                version_stats: version_stat.clone(),
262                create_mview_tracker,
263                snapshot_backfill_actors,
264                snapshot_epoch,
265                info: job_info,
266                pending_non_checkpoint_barriers,
267            };
268            Ok(job)
269        }
270    }
271
272    pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
273        match &self.status {
274            CreatingStreamingJobStatus::ConsumingSnapshot {
275                create_mview_tracker,
276                info,
277                ..
278            } => create_mview_tracker.collect_fragment_progress(&info.fragment_infos, true),
279            CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
280                collect_done_fragments(self.job_id, &info.fragment_infos)
281            }
282            CreatingStreamingJobStatus::Finishing(_, _)
283            | CreatingStreamingJobStatus::Resetting(_)
284            | CreatingStreamingJobStatus::PlaceHolder => vec![],
285        }
286    }
287
288    fn resolve_upstream_log_epochs(
289        snapshot_backfill_upstream_tables: &HashSet<TableId>,
290        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
291        exclusive_start_log_epoch: u64,
292        upstream_barrier_info: &BarrierInfo,
293    ) -> MetaResult<Vec<BarrierInfo>> {
294        let table_id = snapshot_backfill_upstream_tables
295            .iter()
296            .next()
297            .expect("snapshot backfill job should have upstream");
298        let epochs_iter = if let Some(epochs) = upstream_table_log_epochs.get(table_id) {
299            let mut epochs_iter = epochs.iter();
300            loop {
301                let (_, checkpoint_epoch) =
302                    epochs_iter.next().expect("not reach committed epoch yet");
303                if *checkpoint_epoch < exclusive_start_log_epoch {
304                    continue;
305                }
306                assert_eq!(*checkpoint_epoch, exclusive_start_log_epoch);
307                break;
308            }
309            epochs_iter
310        } else {
311            // snapshot backfill job has been marked as creating, but upstream table has not committed a new epoch yet, so no table change log
312            assert_eq!(
313                upstream_barrier_info.prev_epoch(),
314                exclusive_start_log_epoch
315            );
316            static EMPTY_VEC: Vec<(Vec<u64>, u64)> = Vec::new();
317            EMPTY_VEC.iter()
318        };
319
320        let mut ret = vec![];
321        let mut prev_epoch = exclusive_start_log_epoch;
322        let mut pending_non_checkpoint_barriers = vec![];
323        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
324            for (i, epoch) in non_checkpoint_epochs
325                .iter()
326                .chain([checkpoint_epoch])
327                .enumerate()
328            {
329                assert!(*epoch > prev_epoch);
330                pending_non_checkpoint_barriers.push(prev_epoch);
331                ret.push(BarrierInfo {
332                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
333                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
334                    kind: if i == 0 {
335                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
336                    } else {
337                        BarrierKind::Barrier
338                    },
339                });
340                prev_epoch = *epoch;
341            }
342        }
343        ret.push(BarrierInfo {
344            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
345            curr_epoch: TracedEpoch::new(Epoch(upstream_barrier_info.curr_epoch())),
346            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
347        });
348        Ok(ret)
349    }
350
351    fn recover_consuming_snapshot(
352        job_id: JobId,
353        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
354        snapshot_epoch: u64,
355        committed_epoch: u64,
356        upstream_barrier_info: &BarrierInfo,
357        info: CreatingJobInfo,
358        backfill_order_state: BackfillOrderState,
359        version_stat: &HummockVersionStats,
360    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
361        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
362        let mut pending_non_checkpoint_barriers = vec![];
363        let create_mview_tracker = CreateMviewProgressTracker::recover(
364            job_id,
365            &info.fragment_infos,
366            backfill_order_state,
367            version_stat,
368        );
369        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
370            &mut prev_epoch_fake_physical_time,
371            &mut pending_non_checkpoint_barriers,
372            PbBarrierKind::Initial,
373        );
374
375        Ok((
376            CreatingStreamingJobStatus::ConsumingSnapshot {
377                prev_epoch_fake_physical_time,
378                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
379                    &info.snapshot_backfill_upstream_tables,
380                    upstream_table_log_epochs,
381                    snapshot_epoch,
382                    upstream_barrier_info,
383                )?,
384                version_stats: version_stat.clone(),
385                create_mview_tracker,
386                snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
387                    &info.fragment_infos,
388                )
389                .collect(),
390                info,
391                snapshot_epoch,
392                pending_non_checkpoint_barriers,
393            },
394            barrier_info,
395        ))
396    }
397
398    fn recover_consuming_log_store(
399        job_id: JobId,
400        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
401        committed_epoch: u64,
402        upstream_barrier_info: &BarrierInfo,
403        info: CreatingJobInfo,
404    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
405        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
406            &info.snapshot_backfill_upstream_tables,
407            upstream_table_log_epochs,
408            committed_epoch,
409            upstream_barrier_info,
410        )?;
411        let mut first_barrier = barriers_to_inject.remove(0);
412        assert!(first_barrier.kind.is_checkpoint());
413        first_barrier.kind = BarrierKind::Initial;
414
415        Ok((
416            CreatingStreamingJobStatus::ConsumingLogStore {
417                tracking_job: TrackingJob::recovered(job_id, &info.fragment_infos),
418                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
419                    InflightStreamingJobInfo::snapshot_backfill_actor_ids(&info.fragment_infos),
420                    barriers_to_inject
421                        .last()
422                        .map(|info| info.prev_epoch() - committed_epoch)
423                        .unwrap_or(0),
424                ),
425                barriers_to_inject: Some(barriers_to_inject),
426                info,
427            },
428            first_barrier,
429        ))
430    }
431
432    #[expect(clippy::too_many_arguments)]
433    pub(crate) fn recover(
434        database_id: DatabaseId,
435        job_id: JobId,
436        snapshot_backfill_upstream_tables: HashSet<TableId>,
437        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
438        snapshot_epoch: u64,
439        committed_epoch: u64,
440        upstream_barrier_info: &BarrierInfo,
441        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
442        backfill_order: ExtendedFragmentBackfillOrder,
443        fragment_relations: &FragmentDownstreamRelation,
444        version_stat: &HummockVersionStats,
445        new_actors: StreamJobActorsToCreate,
446        initial_mutation: Mutation,
447        partial_graph_recoverer: &mut PartialGraphRecoverer<'_>,
448    ) -> MetaResult<Self> {
449        info!(
450            %job_id,
451            "recovered creating snapshot backfill job"
452        );
453        let barrier_control =
454            CreatingStreamingJobBarrierControl::new(job_id, snapshot_epoch, Some(committed_epoch));
455
456        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
457        let state_table_ids: HashSet<_> =
458            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
459
460        let mut upstream_fragment_downstreams: FragmentDownstreamRelation = Default::default();
461        for (upstream_fragment_id, downstreams) in fragment_relations {
462            if fragment_infos.contains_key(upstream_fragment_id) {
463                continue;
464            }
465            for downstream in downstreams {
466                if fragment_infos.contains_key(&downstream.downstream_fragment_id) {
467                    upstream_fragment_downstreams
468                        .entry(*upstream_fragment_id)
469                        .or_default()
470                        .push(downstream.clone());
471                }
472            }
473        }
474        let downstreams = fragment_infos
475            .keys()
476            .filter_map(|fragment_id| {
477                fragment_relations
478                    .get(fragment_id)
479                    .map(|relation| (*fragment_id, relation.clone()))
480            })
481            .collect();
482
483        let info = CreatingJobInfo {
484            fragment_infos,
485            upstream_fragment_downstreams,
486            downstreams,
487            snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
488            stream_actors: new_actors
489                .values()
490                .flat_map(|fragments| {
491                    fragments.values().flat_map(|(_, actors, _)| {
492                        actors
493                            .iter()
494                            .map(|(actor, _, _)| (actor.actor_id, actor.clone()))
495                    })
496                })
497                .collect(),
498        };
499
500        let (status, first_barrier_info) = if committed_epoch < snapshot_epoch {
501            let locality_fragment_state_table_mapping =
502                build_locality_fragment_state_table_mapping(&info.fragment_infos);
503            let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
504                &backfill_order,
505                &info.fragment_infos,
506                locality_fragment_state_table_mapping,
507            );
508            Self::recover_consuming_snapshot(
509                job_id,
510                upstream_table_log_epochs,
511                snapshot_epoch,
512                committed_epoch,
513                upstream_barrier_info,
514                info,
515                backfill_order_state,
516                version_stat,
517            )?
518        } else {
519            Self::recover_consuming_log_store(
520                job_id,
521                upstream_table_log_epochs,
522                committed_epoch,
523                upstream_barrier_info,
524                info,
525            )?
526        };
527
528        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
529
530        partial_graph_recoverer.recover_graph(
531            partial_graph_id,
532            initial_mutation,
533            &first_barrier_info,
534            &node_actors,
535            state_table_ids.iter().copied(),
536            new_actors,
537        )?;
538
539        Ok(Self {
540            job_id,
541            partial_graph_id,
542            snapshot_backfill_upstream_tables,
543            snapshot_epoch,
544            node_actors,
545            state_table_ids,
546            barrier_control,
547            status,
548            upstream_lag: GLOBAL_META_METRICS
549                .snapshot_backfill_lag
550                .with_guarded_label_values(&[&format!("{}", job_id)]),
551        })
552    }
553
554    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
555        self.status
556            .fragment_infos()
557            .map(|fragment_infos| {
558                !InflightFragmentInfo::contains_worker(fragment_infos.values(), worker_id)
559            })
560            .unwrap_or(true)
561    }
562
563    pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
564        let progress = match &self.status {
565            CreatingStreamingJobStatus::ConsumingSnapshot {
566                create_mview_tracker,
567                ..
568            } => {
569                if create_mview_tracker.is_finished() {
570                    "Snapshot finished".to_owned()
571                } else {
572                    let progress = create_mview_tracker.gen_backfill_progress();
573                    format!("Snapshot [{}]", progress)
574                }
575            }
576            CreatingStreamingJobStatus::ConsumingLogStore {
577                log_store_progress_tracker,
578                ..
579            } => {
580                format!(
581                    "LogStore [{}]",
582                    log_store_progress_tracker.gen_backfill_progress()
583                )
584            }
585            CreatingStreamingJobStatus::Finishing(..) => {
586                format!(
587                    "Finishing [epoch count: {}]",
588                    self.barrier_control.inflight_barrier_count()
589                )
590            }
591            CreatingStreamingJobStatus::Resetting(_) => "Resetting".to_owned(),
592            CreatingStreamingJobStatus::PlaceHolder => {
593                unreachable!()
594            }
595        };
596        BackfillProgress {
597            progress,
598            backfill_type: PbBackfillType::SnapshotBackfill,
599        }
600    }
601
602    pub(super) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
603        (
604            max(
605                self.barrier_control.max_committed_epoch().unwrap_or(0),
606                self.snapshot_epoch,
607            ),
608            self.snapshot_backfill_upstream_tables.clone(),
609        )
610    }
611
612    fn inject_barrier(
613        partial_graph_id: PartialGraphId,
614        partial_graph_manager: &mut PartialGraphManager,
615        barrier_control: &mut CreatingStreamingJobBarrierControl,
616        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
617        state_table_ids: Option<&HashSet<TableId>>,
618        barrier_info: BarrierInfo,
619        new_actors: Option<StreamJobActorsToCreate>,
620        mutation: Option<Mutation>,
621        mut notifiers: Vec<Notifier>,
622        first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
623    ) -> MetaResult<()> {
624        let (state_table_ids, nodes_to_sync_table) = if let Some(state_table_ids) = state_table_ids
625        {
626            (Some(state_table_ids), Some(node_actors.keys().copied()))
627        } else {
628            (None, None)
629        };
630        partial_graph_manager.inject_barrier(
631            partial_graph_id,
632            mutation,
633            &barrier_info,
634            node_actors,
635            state_table_ids.into_iter().flatten().copied(),
636            nodes_to_sync_table.into_iter().flatten(),
637            new_actors,
638        )?;
639        notifiers.iter_mut().for_each(|n| n.notify_started());
640        barrier_control.enqueue_epoch(
641            barrier_info.prev_epoch(),
642            barrier_info.kind.clone(),
643            notifiers,
644            first_create_info,
645        );
646        Ok(())
647    }
648
649    pub(super) fn start_consume_upstream(
650        &mut self,
651        partial_graph_manager: &mut PartialGraphManager,
652        barrier_info: &BarrierInfo,
653    ) -> MetaResult<CreatingJobInfo> {
654        info!(
655            job_id = %self.job_id,
656            prev_epoch = barrier_info.prev_epoch(),
657            "start consuming upstream"
658        );
659        let info = self.status.start_consume_upstream(barrier_info);
660        Self::inject_barrier(
661            self.partial_graph_id,
662            partial_graph_manager,
663            &mut self.barrier_control,
664            &self.node_actors,
665            None,
666            barrier_info.clone(),
667            None,
668            Some(Mutation::Stop(StopMutation {
669                // stop all actors
670                actors: info
671                    .fragment_infos
672                    .values()
673                    .flat_map(|info| info.actors.keys().copied())
674                    .collect(),
675                dropped_sink_fragments: vec![], // not related to sink-into-table
676            })),
677            vec![], // no notifiers when start consuming upstream
678            None,
679        )?;
680        Ok(info)
681    }
682
683    pub(super) fn on_new_upstream_barrier(
684        &mut self,
685        partial_graph_manager: &mut PartialGraphManager,
686        barrier_info: &BarrierInfo,
687        mutation: Option<(Mutation, Vec<Notifier>)>,
688    ) -> MetaResult<()> {
689        let progress_epoch =
690            if let Some(max_committed_epoch) = self.barrier_control.max_committed_epoch() {
691                max(max_committed_epoch, self.snapshot_epoch)
692            } else {
693                self.snapshot_epoch
694            };
695        self.upstream_lag.set(
696            barrier_info
697                .prev_epoch
698                .value()
699                .0
700                .saturating_sub(progress_epoch) as _,
701        );
702        let (mut mutation, mut notifiers) = match mutation {
703            Some((mutation, notifiers)) => (Some(mutation), notifiers),
704            None => (None, vec![]),
705        };
706        {
707            for (barrier_to_inject, mutation) in self
708                .status
709                .on_new_upstream_epoch(barrier_info, mutation.take())
710            {
711                Self::inject_barrier(
712                    self.partial_graph_id,
713                    partial_graph_manager,
714                    &mut self.barrier_control,
715                    &self.node_actors,
716                    Some(&self.state_table_ids),
717                    barrier_to_inject,
718                    None,
719                    mutation,
720                    take(&mut notifiers),
721                    None,
722                )?;
723            }
724            assert!(mutation.is_none(), "must have consumed mutation");
725            assert!(notifiers.is_empty(), "must consumed notifiers");
726        }
727        Ok(())
728    }
729
730    pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier) -> bool {
731        self.status.update_progress(
732            collected_barrier
733                .resps
734                .values()
735                .flat_map(|resp| &resp.create_mview_progress),
736        );
737        self.barrier_control.collect(collected_barrier);
738        self.should_merge_to_upstream()
739    }
740
741    pub(super) fn should_merge_to_upstream(&self) -> bool {
742        if let CreatingStreamingJobStatus::ConsumingLogStore {
743            log_store_progress_tracker,
744            barriers_to_inject,
745            ..
746        } = &self.status
747            && barriers_to_inject.is_none()
748            && log_store_progress_tracker.is_finished()
749        {
750            true
751        } else {
752            false
753        }
754    }
755}
756
757pub(super) enum CompleteJobType {
758    /// The first barrier
759    First(CreateSnapshotBackfillJobCommandInfo),
760    Normal,
761    /// The last barrier to complete
762    Finished,
763}
764
765impl CreatingStreamingJobControl {
766    pub(super) fn start_completing(
767        &mut self,
768        min_upstream_inflight_epoch: Option<u64>,
769        upstream_committed_epoch: u64,
770    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
771        // do not commit snapshot backfill job until upstream has committed the snapshot epoch
772        if upstream_committed_epoch < self.snapshot_epoch {
773            return None;
774        }
775        let (finished_at_epoch, epoch_end_bound) = match &self.status {
776            CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
777                let epoch_end_bound = min_upstream_inflight_epoch
778                    .map(|upstream_epoch| {
779                        if upstream_epoch < *finish_at_epoch {
780                            Excluded(upstream_epoch)
781                        } else {
782                            Unbounded
783                        }
784                    })
785                    .unwrap_or(Unbounded);
786                (Some(*finish_at_epoch), epoch_end_bound)
787            }
788            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
789            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
790                None,
791                min_upstream_inflight_epoch
792                    .map(Excluded)
793                    .unwrap_or(Unbounded),
794            ),
795            CreatingStreamingJobStatus::Resetting(..) => {
796                return None;
797            }
798            CreatingStreamingJobStatus::PlaceHolder => {
799                unreachable!()
800            }
801        };
802        self.barrier_control.start_completing(epoch_end_bound).map(
803            |(epoch, resps, create_job_info)| {
804                let status = if let Some(finish_at_epoch) = finished_at_epoch {
805                    assert!(create_job_info.is_none());
806                    if epoch == finish_at_epoch {
807                        self.barrier_control.ack_completed(epoch);
808                        assert!(self.barrier_control.is_empty());
809                        CompleteJobType::Finished
810                    } else {
811                        CompleteJobType::Normal
812                    }
813                } else if let Some(info) = create_job_info {
814                    CompleteJobType::First(info)
815                } else {
816                    CompleteJobType::Normal
817                };
818                (epoch, resps, status)
819            },
820        )
821    }
822
823    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
824        self.barrier_control.ack_completed(completed_epoch);
825    }
826
827    pub fn state_table_ids(&self) -> &HashSet<TableId> {
828        &self.state_table_ids
829    }
830
831    pub fn fragment_infos_with_job_id(
832        &self,
833    ) -> impl Iterator<Item = (&InflightFragmentInfo, JobId)> + '_ {
834        self.status
835            .fragment_infos()
836            .into_iter()
837            .flat_map(|fragments| fragments.values().map(|fragment| (fragment, self.job_id)))
838    }
839
840    pub(super) fn collect_reschedule_blocked_fragment_ids(
841        &self,
842        blocked_fragment_ids: &mut HashSet<FragmentId>,
843    ) {
844        let Some(info) = self.status.creating_job_info() else {
845            return;
846        };
847
848        for (fragment_id, fragment) in &info.fragment_infos {
849            if fragment_has_online_unreschedulable_scan(fragment) {
850                blocked_fragment_ids.insert(*fragment_id);
851                collect_fragment_upstream_fragment_ids(fragment, blocked_fragment_ids);
852            }
853        }
854    }
855
856    pub(super) fn collect_no_shuffle_fragment_relations(
857        &self,
858        no_shuffle_relations: &mut Vec<(FragmentId, FragmentId)>,
859    ) {
860        let Some(info) = self.status.creating_job_info() else {
861            return;
862        };
863
864        for (upstream_fragment_id, downstreams) in &info.upstream_fragment_downstreams {
865            no_shuffle_relations.extend(
866                downstreams
867                    .iter()
868                    .filter(|downstream| downstream.dispatcher_type == DispatcherType::NoShuffle)
869                    .map(|downstream| (*upstream_fragment_id, downstream.downstream_fragment_id)),
870            );
871        }
872
873        for (fragment_id, downstreams) in &info.downstreams {
874            no_shuffle_relations.extend(
875                downstreams
876                    .iter()
877                    .filter(|downstream| downstream.dispatcher_type == DispatcherType::NoShuffle)
878                    .map(|downstream| (*fragment_id, downstream.downstream_fragment_id)),
879            );
880        }
881    }
882
883    pub fn into_tracking_job(self) -> TrackingJob {
884        assert!(self.barrier_control.is_empty());
885        match self.status {
886            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
887            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
888            | CreatingStreamingJobStatus::Resetting(..)
889            | CreatingStreamingJobStatus::PlaceHolder => {
890                unreachable!("expect finish")
891            }
892            CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
893        }
894    }
895
896    pub(super) fn on_partial_graph_reset(mut self) {
897        match &mut self.status {
898            CreatingStreamingJobStatus::Resetting(notifiers) => {
899                for notifier in notifiers.drain(..) {
900                    notifier.notify_collected();
901                }
902            }
903            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
904            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
905            | CreatingStreamingJobStatus::Finishing(_, _) => {
906                panic!(
907                    "should be resetting when receiving reset partial graph resp, but at {:?}",
908                    self.status
909                )
910            }
911            CreatingStreamingJobStatus::PlaceHolder => {
912                unreachable!()
913            }
914        }
915    }
916
917    /// Drop a creating snapshot backfill job by directly resetting the partial graph
918    /// Return `false` if the partial graph has been merged to upstream database, and `true` otherwise
919    /// to mean that the job has been dropped.
920    pub(super) fn drop(
921        &mut self,
922        notifiers: &mut Vec<Notifier>,
923        partial_graph_manager: &mut PartialGraphManager,
924    ) -> bool {
925        match &mut self.status {
926            CreatingStreamingJobStatus::Resetting(existing_notifiers) => {
927                for notifier in &mut *notifiers {
928                    notifier.notify_started();
929                }
930                existing_notifiers.append(notifiers);
931                true
932            }
933            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
934            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
935                for notifier in &mut *notifiers {
936                    notifier.notify_started();
937                }
938                partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
939                self.status = CreatingStreamingJobStatus::Resetting(take(notifiers));
940                true
941            }
942            CreatingStreamingJobStatus::Finishing(_, _) => false,
943            CreatingStreamingJobStatus::PlaceHolder => {
944                unreachable!()
945            }
946        }
947    }
948
949    pub(super) fn reset(self) -> bool {
950        match self.status {
951            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
952            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
953            | CreatingStreamingJobStatus::Finishing(_, _) => false,
954            CreatingStreamingJobStatus::Resetting(notifiers) => {
955                for notifier in notifiers {
956                    notifier.notify_collected();
957                }
958                true
959            }
960            CreatingStreamingJobStatus::PlaceHolder => {
961                unreachable!()
962            }
963        }
964    }
965}