risingwave_meta/barrier/checkpoint/independent_job/creating_job/
mod.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet, hash_map};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22use std::time::Duration;
23
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::WorkerId;
29use risingwave_pb::ddl_service::PbBackfillType;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_plan::{AddMutation, StopMutation};
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use status::CreatingStreamingJobStatus;
37use tracing::{debug, info};
38
39use super::super::state::RenderResult;
40use super::IndependentCheckpointJobControl;
41use crate::MetaResult;
42use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
43use crate::barrier::checkpoint::independent_job::creating_job::barrier_control::CreatingStreamingJobBarrierStats;
44use crate::barrier::checkpoint::independent_job::creating_job::status::CreateMviewLogStoreProgressTracker;
45use crate::barrier::command::PostCollectCommand;
46use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
47use crate::barrier::edge_builder::FragmentEdgeBuildResult;
48use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
49use crate::barrier::notifier::Notifier;
50use crate::barrier::partial_graph::{
51    CollectedBarrier, PartialGraphBarrierInfo, PartialGraphManager, PartialGraphRecoverer,
52};
53use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
54use crate::barrier::rpc::{build_locality_fragment_state_table_mapping, to_partial_graph_id};
55use crate::barrier::{
56    BackfillOrderState, BackfillProgress, BarrierKind, Command, FragmentBackfillProgress,
57    TracedEpoch,
58};
59use crate::controller::fragment::InflightFragmentInfo;
60use crate::model::{FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate};
61use crate::rpc::metrics::GLOBAL_META_METRICS;
62use crate::stream::source_manager::SplitAssignment;
63use crate::stream::{ExtendedFragmentBackfillOrder, build_actor_connector_splits};
64
65#[derive(Debug)]
66pub(crate) struct CreatingJobInfo {
67    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
68    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
69    pub downstreams: FragmentDownstreamRelation,
70    pub snapshot_backfill_upstream_tables: HashSet<TableId>,
71    pub stream_actors: HashMap<ActorId, StreamActor>,
72}
73
74#[derive(Debug)]
75pub(crate) struct CreatingStreamingJobControl {
76    job_id: JobId,
77    partial_graph_id: PartialGraphId,
78    snapshot_backfill_upstream_tables: HashSet<TableId>,
79    snapshot_epoch: u64,
80
81    node_actors: HashMap<WorkerId, HashSet<ActorId>>,
82    state_table_ids: HashSet<TableId>,
83
84    max_committed_epoch: Option<u64>,
85    status: CreatingStreamingJobStatus,
86
87    upstream_lag: LabelGuardedIntGauge,
88}
89
90impl CreatingStreamingJobControl {
91    pub(crate) fn new<'a>(
92        entry: hash_map::VacantEntry<'a, JobId, IndependentCheckpointJobControl>,
93        create_info: CreateSnapshotBackfillJobCommandInfo,
94        notifiers: Vec<Notifier>,
95        snapshot_backfill_upstream_tables: HashSet<TableId>,
96        snapshot_epoch: u64,
97        version_stat: &HummockVersionStats,
98        partial_graph_manager: &mut PartialGraphManager,
99        edges: &mut FragmentEdgeBuildResult,
100        split_assignment: &SplitAssignment,
101        actors: &RenderResult,
102    ) -> MetaResult<&'a mut Self> {
103        let info = create_info.info.clone();
104        let job_id = info.stream_job_fragments.stream_job_id();
105        let database_id = info.streaming_job.database_id();
106        debug!(
107            %job_id,
108            definition = info.definition,
109            "new creating job"
110        );
111        let fragment_infos = info
112            .stream_job_fragments
113            .new_fragment_info(
114                &actors.stream_actors,
115                &actors.actor_location,
116                split_assignment,
117            )
118            .collect();
119        let snapshot_backfill_actors =
120            InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
121        let backfill_nodes_to_pause =
122            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
123                .into_iter()
124                .collect();
125        let backfill_order_state = BackfillOrderState::new(
126            &info.fragment_backfill_ordering,
127            &fragment_infos,
128            info.locality_fragment_state_table_mapping.clone(),
129        );
130        let create_mview_tracker = CreateMviewProgressTracker::recover(
131            job_id,
132            &fragment_infos,
133            backfill_order_state,
134            version_stat,
135        );
136
137        let actors_to_create = Command::create_streaming_job_actors_to_create(
138            &info,
139            edges,
140            &actors.stream_actors,
141            &actors.actor_location,
142        );
143
144        let mut prev_epoch_fake_physical_time = 0;
145        let mut pending_non_checkpoint_barriers = vec![];
146
147        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
148            &mut prev_epoch_fake_physical_time,
149            &mut pending_non_checkpoint_barriers,
150            PbBarrierKind::Checkpoint,
151        );
152
153        let added_actors: Vec<ActorId> = actors
154            .stream_actors
155            .values()
156            .flatten()
157            .map(|actor| actor.actor_id)
158            .collect();
159        let actor_splits = split_assignment
160            .values()
161            .flat_map(build_actor_connector_splits)
162            .collect();
163
164        assert!(
165            info.cdc_table_snapshot_splits.is_none(),
166            "should not have cdc backfill for snapshot backfill job"
167        );
168
169        let initial_mutation = Mutation::Add(AddMutation {
170            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
171            actor_dispatchers: Default::default(),
172            added_actors,
173            actor_splits,
174            // we assume that when handling snapshot backfill, the cluster must not be paused
175            pause: false,
176            subscriptions_to_add: Default::default(),
177            backfill_nodes_to_pause,
178            actor_cdc_table_snapshot_splits: None,
179            new_upstream_sinks: Default::default(),
180        });
181
182        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
183        let state_table_ids =
184            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
185
186        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
187
188        let IndependentCheckpointJobControl::CreatingStreamingJob(job) = entry.insert(
189            IndependentCheckpointJobControl::CreatingStreamingJob(Self {
190                partial_graph_id,
191                job_id,
192                snapshot_backfill_upstream_tables,
193                max_committed_epoch: None,
194                snapshot_epoch,
195                status: CreatingStreamingJobStatus::PlaceHolder, // filled in later code
196                upstream_lag: GLOBAL_META_METRICS
197                    .snapshot_backfill_lag
198                    .with_guarded_label_values(&[&format!("{}", job_id)]),
199                node_actors,
200                state_table_ids,
201            }),
202        ) else {
203            unreachable!()
204        };
205
206        let mut graph_adder = partial_graph_manager.add_partial_graph(
207            partial_graph_id,
208            CreatingStreamingJobBarrierStats::new(job_id, snapshot_epoch),
209        );
210
211        if let Err(e) = Self::inject_barrier(
212            partial_graph_id,
213            graph_adder.manager(),
214            &job.node_actors,
215            &job.state_table_ids,
216            false,
217            initial_barrier_info,
218            Some(actors_to_create),
219            Some(initial_mutation),
220            notifiers,
221            Some(create_info),
222        ) {
223            graph_adder.failed();
224            job.status = CreatingStreamingJobStatus::Resetting(vec![]);
225            Err(e)
226        } else {
227            graph_adder.added();
228            assert!(pending_non_checkpoint_barriers.is_empty());
229            let job_info = CreatingJobInfo {
230                fragment_infos,
231                upstream_fragment_downstreams: info.upstream_fragment_downstreams.clone(),
232                downstreams: info.stream_job_fragments.downstreams,
233                snapshot_backfill_upstream_tables: job.snapshot_backfill_upstream_tables.clone(),
234                stream_actors: actors
235                    .stream_actors
236                    .values()
237                    .flatten()
238                    .map(|actor| (actor.actor_id, actor.clone()))
239                    .collect(),
240            };
241            job.status = CreatingStreamingJobStatus::ConsumingSnapshot {
242                prev_epoch_fake_physical_time,
243                pending_upstream_barriers: vec![],
244                version_stats: version_stat.clone(),
245                create_mview_tracker,
246                snapshot_backfill_actors,
247                snapshot_epoch,
248                info: job_info,
249                pending_non_checkpoint_barriers,
250            };
251            Ok(job)
252        }
253    }
254
255    pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
256        match &self.status {
257            CreatingStreamingJobStatus::ConsumingSnapshot {
258                create_mview_tracker,
259                info,
260                ..
261            } => create_mview_tracker.collect_fragment_progress(&info.fragment_infos, true),
262            CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
263                collect_done_fragments(self.job_id, &info.fragment_infos)
264            }
265            CreatingStreamingJobStatus::Finishing(_, _)
266            | CreatingStreamingJobStatus::Resetting(_)
267            | CreatingStreamingJobStatus::PlaceHolder => vec![],
268        }
269    }
270
271    fn resolve_upstream_log_epochs(
272        snapshot_backfill_upstream_tables: &HashSet<TableId>,
273        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
274        exclusive_start_log_epoch: u64,
275        upstream_barrier_info: &BarrierInfo,
276    ) -> MetaResult<Vec<BarrierInfo>> {
277        let table_id = snapshot_backfill_upstream_tables
278            .iter()
279            .next()
280            .expect("snapshot backfill job should have upstream");
281        let epochs_iter = if let Some(epochs) = upstream_table_log_epochs.get(table_id) {
282            let mut epochs_iter = epochs.iter();
283            loop {
284                let (_, checkpoint_epoch) =
285                    epochs_iter.next().expect("not reach committed epoch yet");
286                if *checkpoint_epoch < exclusive_start_log_epoch {
287                    continue;
288                }
289                assert_eq!(*checkpoint_epoch, exclusive_start_log_epoch);
290                break;
291            }
292            epochs_iter
293        } else {
294            // snapshot backfill job has been marked as creating, but upstream table has not committed a new epoch yet, so no table change log
295            assert_eq!(
296                upstream_barrier_info.prev_epoch(),
297                exclusive_start_log_epoch
298            );
299            static EMPTY_VEC: Vec<(Vec<u64>, u64)> = Vec::new();
300            EMPTY_VEC.iter()
301        };
302
303        let mut ret = vec![];
304        let mut prev_epoch = exclusive_start_log_epoch;
305        let mut pending_non_checkpoint_barriers = vec![];
306        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
307            for (i, epoch) in non_checkpoint_epochs
308                .iter()
309                .chain([checkpoint_epoch])
310                .enumerate()
311            {
312                assert!(*epoch > prev_epoch);
313                pending_non_checkpoint_barriers.push(prev_epoch);
314                ret.push(BarrierInfo {
315                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
316                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
317                    kind: if i == 0 {
318                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
319                    } else {
320                        BarrierKind::Barrier
321                    },
322                });
323                prev_epoch = *epoch;
324            }
325        }
326        ret.push(BarrierInfo {
327            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
328            curr_epoch: TracedEpoch::new(Epoch(upstream_barrier_info.curr_epoch())),
329            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
330        });
331        Ok(ret)
332    }
333
334    fn recover_consuming_snapshot(
335        job_id: JobId,
336        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
337        snapshot_epoch: u64,
338        committed_epoch: u64,
339        upstream_barrier_info: &BarrierInfo,
340        info: CreatingJobInfo,
341        backfill_order_state: BackfillOrderState,
342        version_stat: &HummockVersionStats,
343    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
344        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
345        let mut pending_non_checkpoint_barriers = vec![];
346        let create_mview_tracker = CreateMviewProgressTracker::recover(
347            job_id,
348            &info.fragment_infos,
349            backfill_order_state,
350            version_stat,
351        );
352        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
353            &mut prev_epoch_fake_physical_time,
354            &mut pending_non_checkpoint_barriers,
355            PbBarrierKind::Initial,
356        );
357
358        Ok((
359            CreatingStreamingJobStatus::ConsumingSnapshot {
360                prev_epoch_fake_physical_time,
361                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
362                    &info.snapshot_backfill_upstream_tables,
363                    upstream_table_log_epochs,
364                    snapshot_epoch,
365                    upstream_barrier_info,
366                )?,
367                version_stats: version_stat.clone(),
368                create_mview_tracker,
369                snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
370                    &info.fragment_infos,
371                )
372                .collect(),
373                info,
374                snapshot_epoch,
375                pending_non_checkpoint_barriers,
376            },
377            barrier_info,
378        ))
379    }
380
381    fn recover_consuming_log_store(
382        job_id: JobId,
383        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
384        committed_epoch: u64,
385        upstream_barrier_info: &BarrierInfo,
386        info: CreatingJobInfo,
387    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
388        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
389            &info.snapshot_backfill_upstream_tables,
390            upstream_table_log_epochs,
391            committed_epoch,
392            upstream_barrier_info,
393        )?;
394        let mut first_barrier = barriers_to_inject.remove(0);
395        assert!(first_barrier.kind.is_checkpoint());
396        first_barrier.kind = BarrierKind::Initial;
397
398        Ok((
399            CreatingStreamingJobStatus::ConsumingLogStore {
400                tracking_job: TrackingJob::recovered(job_id, &info.fragment_infos),
401                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
402                    InflightStreamingJobInfo::snapshot_backfill_actor_ids(&info.fragment_infos),
403                    barriers_to_inject
404                        .last()
405                        .map(|info| info.prev_epoch() - committed_epoch)
406                        .unwrap_or(0),
407                ),
408                barriers_to_inject: Some(barriers_to_inject),
409                info,
410            },
411            first_barrier,
412        ))
413    }
414
415    #[expect(clippy::too_many_arguments)]
416    pub(crate) fn recover(
417        database_id: DatabaseId,
418        job_id: JobId,
419        snapshot_backfill_upstream_tables: HashSet<TableId>,
420        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
421        snapshot_epoch: u64,
422        committed_epoch: u64,
423        upstream_barrier_info: &BarrierInfo,
424        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
425        backfill_order: ExtendedFragmentBackfillOrder,
426        fragment_relations: &FragmentDownstreamRelation,
427        version_stat: &HummockVersionStats,
428        new_actors: StreamJobActorsToCreate,
429        initial_mutation: Mutation,
430        partial_graph_recoverer: &mut PartialGraphRecoverer<'_>,
431    ) -> MetaResult<Self> {
432        info!(
433            %job_id,
434            "recovered creating snapshot backfill job"
435        );
436
437        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
438        let state_table_ids: HashSet<_> =
439            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
440
441        let mut upstream_fragment_downstreams: FragmentDownstreamRelation = Default::default();
442        for (upstream_fragment_id, downstreams) in fragment_relations {
443            if fragment_infos.contains_key(upstream_fragment_id) {
444                continue;
445            }
446            for downstream in downstreams {
447                if fragment_infos.contains_key(&downstream.downstream_fragment_id) {
448                    upstream_fragment_downstreams
449                        .entry(*upstream_fragment_id)
450                        .or_default()
451                        .push(downstream.clone());
452                }
453            }
454        }
455        let downstreams = fragment_infos
456            .keys()
457            .filter_map(|fragment_id| {
458                fragment_relations
459                    .get(fragment_id)
460                    .map(|relation| (*fragment_id, relation.clone()))
461            })
462            .collect();
463
464        let info = CreatingJobInfo {
465            fragment_infos,
466            upstream_fragment_downstreams,
467            downstreams,
468            snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
469            stream_actors: new_actors
470                .values()
471                .flat_map(|fragments| {
472                    fragments.values().flat_map(|(_, actors, _)| {
473                        actors
474                            .iter()
475                            .map(|(actor, _, _)| (actor.actor_id, actor.clone()))
476                    })
477                })
478                .collect(),
479        };
480
481        let (status, first_barrier_info) = if committed_epoch < snapshot_epoch {
482            let locality_fragment_state_table_mapping =
483                build_locality_fragment_state_table_mapping(&info.fragment_infos);
484            let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
485                &backfill_order,
486                &info.fragment_infos,
487                locality_fragment_state_table_mapping,
488            );
489            Self::recover_consuming_snapshot(
490                job_id,
491                upstream_table_log_epochs,
492                snapshot_epoch,
493                committed_epoch,
494                upstream_barrier_info,
495                info,
496                backfill_order_state,
497                version_stat,
498            )?
499        } else {
500            Self::recover_consuming_log_store(
501                job_id,
502                upstream_table_log_epochs,
503                committed_epoch,
504                upstream_barrier_info,
505                info,
506            )?
507        };
508
509        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
510
511        partial_graph_recoverer.recover_graph(
512            partial_graph_id,
513            initial_mutation,
514            &first_barrier_info,
515            &node_actors,
516            state_table_ids.iter().copied(),
517            new_actors,
518            CreatingStreamingJobBarrierStats::new(job_id, snapshot_epoch),
519        )?;
520
521        Ok(Self {
522            job_id,
523            partial_graph_id,
524            snapshot_backfill_upstream_tables,
525            snapshot_epoch,
526            node_actors,
527            state_table_ids,
528            max_committed_epoch: Some(committed_epoch),
529            status,
530            upstream_lag: GLOBAL_META_METRICS
531                .snapshot_backfill_lag
532                .with_guarded_label_values(&[&format!("{}", job_id)]),
533        })
534    }
535
536    pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
537        let progress = match &self.status {
538            CreatingStreamingJobStatus::ConsumingSnapshot {
539                create_mview_tracker,
540                ..
541            } => {
542                if create_mview_tracker.is_finished() {
543                    "Snapshot finished".to_owned()
544                } else {
545                    let progress = create_mview_tracker.gen_backfill_progress();
546                    format!("Snapshot [{}]", progress)
547                }
548            }
549            CreatingStreamingJobStatus::ConsumingLogStore {
550                log_store_progress_tracker,
551                ..
552            } => {
553                format!(
554                    "LogStore [{}]",
555                    log_store_progress_tracker.gen_backfill_progress()
556                )
557            }
558            CreatingStreamingJobStatus::Finishing(finish_epoch, ..) => {
559                let committed_epoch = self.max_committed_epoch.expect("should have committed");
560                let lag = Duration::from_millis(
561                    Epoch(*finish_epoch).physical_time() - Epoch(committed_epoch).physical_time(),
562                );
563                format!("Finishing [epoch lag: {lag:?}]",)
564            }
565            CreatingStreamingJobStatus::Resetting(_) => "Resetting".to_owned(),
566            CreatingStreamingJobStatus::PlaceHolder => {
567                unreachable!()
568            }
569        };
570        BackfillProgress {
571            progress,
572            backfill_type: PbBackfillType::SnapshotBackfill,
573        }
574    }
575
576    pub(super) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
577        (
578            max(self.max_committed_epoch.unwrap_or(0), self.snapshot_epoch),
579            self.snapshot_backfill_upstream_tables.clone(),
580        )
581    }
582
583    fn inject_barrier(
584        partial_graph_id: PartialGraphId,
585        partial_graph_manager: &mut PartialGraphManager,
586        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
587        state_table_ids: &HashSet<TableId>,
588        is_finishing: bool,
589        barrier_info: BarrierInfo,
590        new_actors: Option<StreamJobActorsToCreate>,
591        mutation: Option<Mutation>,
592        notifiers: Vec<Notifier>,
593        first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
594    ) -> MetaResult<()> {
595        let (table_ids_to_sync, nodes_to_sync_table) = if !is_finishing {
596            (Some(state_table_ids), Some(node_actors.keys().copied()))
597        } else {
598            (None, None)
599        };
600        partial_graph_manager.inject_barrier(
601            partial_graph_id,
602            mutation,
603            node_actors,
604            table_ids_to_sync.into_iter().flatten().copied(),
605            nodes_to_sync_table.into_iter().flatten(),
606            new_actors,
607            PartialGraphBarrierInfo::new(
608                first_create_info.map_or_else(
609                    PostCollectCommand::barrier,
610                    CreateSnapshotBackfillJobCommandInfo::into_post_collect,
611                ),
612                barrier_info,
613                notifiers,
614                state_table_ids.clone(),
615            ),
616        )?;
617        Ok(())
618    }
619
620    pub(crate) fn start_consume_upstream(
621        &mut self,
622        partial_graph_manager: &mut PartialGraphManager,
623        barrier_info: &BarrierInfo,
624    ) -> MetaResult<CreatingJobInfo> {
625        info!(
626            job_id = %self.job_id,
627            prev_epoch = barrier_info.prev_epoch(),
628            "start consuming upstream"
629        );
630        let info = self.status.start_consume_upstream(barrier_info);
631        Self::inject_barrier(
632            self.partial_graph_id,
633            partial_graph_manager,
634            &self.node_actors,
635            &self.state_table_ids,
636            true,
637            barrier_info.clone(),
638            None,
639            Some(Mutation::Stop(StopMutation {
640                // stop all actors
641                actors: info
642                    .fragment_infos
643                    .values()
644                    .flat_map(|info| info.actors.keys().copied())
645                    .collect(),
646                dropped_sink_fragments: vec![], // not related to sink-into-table
647            })),
648            vec![], // no notifiers when start consuming upstream
649            None,
650        )?;
651        Ok(info)
652    }
653
654    pub(crate) fn on_new_upstream_barrier(
655        &mut self,
656        partial_graph_manager: &mut PartialGraphManager,
657        barrier_info: &BarrierInfo,
658        mutation: Option<(Mutation, Vec<Notifier>)>,
659    ) -> MetaResult<()> {
660        let progress_epoch = if let Some(max_committed_epoch) = self.max_committed_epoch {
661            max(max_committed_epoch, self.snapshot_epoch)
662        } else {
663            self.snapshot_epoch
664        };
665        self.upstream_lag.set(
666            barrier_info
667                .prev_epoch
668                .value()
669                .0
670                .saturating_sub(progress_epoch) as _,
671        );
672        let (mut mutation, mut notifiers) = match mutation {
673            Some((mutation, notifiers)) => (Some(mutation), notifiers),
674            None => (None, vec![]),
675        };
676        {
677            for (barrier_to_inject, mutation) in self
678                .status
679                .on_new_upstream_epoch(barrier_info, mutation.take())
680            {
681                Self::inject_barrier(
682                    self.partial_graph_id,
683                    partial_graph_manager,
684                    &self.node_actors,
685                    &self.state_table_ids,
686                    false,
687                    barrier_to_inject,
688                    None,
689                    mutation,
690                    take(&mut notifiers),
691                    None,
692                )?;
693            }
694            assert!(mutation.is_none(), "must have consumed mutation");
695            assert!(notifiers.is_empty(), "must consumed notifiers");
696        }
697        Ok(())
698    }
699
700    pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
701        self.status.update_progress(
702            collected_barrier
703                .resps
704                .values()
705                .flat_map(|resp| &resp.create_mview_progress),
706        );
707        self.should_merge_to_upstream()
708    }
709
710    pub(crate) fn should_merge_to_upstream(&self) -> bool {
711        if let CreatingStreamingJobStatus::ConsumingLogStore {
712            log_store_progress_tracker,
713            barriers_to_inject,
714            ..
715        } = &self.status
716            && barriers_to_inject.is_none()
717            && log_store_progress_tracker.is_finished()
718        {
719            true
720        } else {
721            false
722        }
723    }
724}
725
726impl CreatingStreamingJobControl {
727    pub(crate) fn start_completing(
728        &mut self,
729        partial_graph_manager: &mut PartialGraphManager,
730        min_upstream_inflight_epoch: Option<u64>,
731        upstream_committed_epoch: u64,
732    ) -> Option<(
733        u64,
734        HashMap<WorkerId, BarrierCompleteResponse>,
735        PartialGraphBarrierInfo,
736        bool,
737    )> {
738        // do not commit snapshot backfill job until upstream has committed the snapshot epoch
739        if upstream_committed_epoch < self.snapshot_epoch {
740            return None;
741        }
742        let (finished_at_epoch, epoch_end_bound) = match &self.status {
743            CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
744                let epoch_end_bound = min_upstream_inflight_epoch
745                    .map(|upstream_epoch| {
746                        if upstream_epoch < *finish_at_epoch {
747                            Excluded(upstream_epoch)
748                        } else {
749                            Unbounded
750                        }
751                    })
752                    .unwrap_or(Unbounded);
753                (Some(*finish_at_epoch), epoch_end_bound)
754            }
755            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
756            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
757                None,
758                min_upstream_inflight_epoch
759                    .map(Excluded)
760                    .unwrap_or(Unbounded),
761            ),
762            CreatingStreamingJobStatus::Resetting(..) => {
763                return None;
764            }
765            CreatingStreamingJobStatus::PlaceHolder => {
766                unreachable!()
767            }
768        };
769        partial_graph_manager
770            .start_completing(
771                self.partial_graph_id,
772                epoch_end_bound,
773                |non_checkpoint_epoch, _, _| {
774                    if let Some(finish_at_epoch) = finished_at_epoch {
775                        assert!(non_checkpoint_epoch.prev < finish_at_epoch);
776                    }
777                },
778            )
779            .map(|(epoch, resps, info)| {
780                let is_finish_epoch = if let Some(finish_at_epoch) = finished_at_epoch {
781                    assert!(!info.post_collect_command.should_checkpoint());
782                    if epoch == finish_at_epoch {
783                        // TODO: can early remove partial graph here
784                        self.ack_completed(partial_graph_manager, epoch);
785                        true
786                    } else {
787                        false
788                    }
789                } else {
790                    false
791                };
792                (epoch, resps, info, is_finish_epoch)
793            })
794    }
795
796    pub(super) fn ack_completed(
797        &mut self,
798        partial_graph_manager: &mut PartialGraphManager,
799        completed_epoch: u64,
800    ) {
801        match &self.status {
802            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
803            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
804            | CreatingStreamingJobStatus::Finishing(_, _) => {
805                partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
806                if let Some(prev_max_committed_epoch) =
807                    self.max_committed_epoch.replace(completed_epoch)
808                {
809                    assert!(completed_epoch > prev_max_committed_epoch);
810                }
811            }
812            CreatingStreamingJobStatus::Resetting(_) => {
813                // The job was dropped while the completing task was running in the background.
814                // The partial graph has already been reset, so skip the ack.
815            }
816            CreatingStreamingJobStatus::PlaceHolder => {
817                unreachable!()
818            }
819        }
820    }
821
822    pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
823        self.status.fragment_infos()
824    }
825
826    pub fn into_tracking_job(self) -> TrackingJob {
827        match self.status {
828            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
829            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
830            | CreatingStreamingJobStatus::Resetting(..)
831            | CreatingStreamingJobStatus::PlaceHolder => {
832                unreachable!("expect finish")
833            }
834            CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
835        }
836    }
837
838    pub(super) fn on_partial_graph_reset(mut self) {
839        match &mut self.status {
840            CreatingStreamingJobStatus::Resetting(notifiers) => {
841                for notifier in notifiers.drain(..) {
842                    notifier.notify_collected();
843                }
844            }
845            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
846            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
847            | CreatingStreamingJobStatus::Finishing(_, _) => {
848                panic!(
849                    "should be resetting when receiving reset partial graph resp, but at {:?}",
850                    self.status
851                )
852            }
853            CreatingStreamingJobStatus::PlaceHolder => {
854                unreachable!()
855            }
856        }
857    }
858
859    /// Drop a creating snapshot backfill job by directly resetting the partial graph
860    /// Return `false` if the partial graph has been merged to upstream database, and `true` otherwise
861    /// to mean that the job has been dropped.
862    pub(super) fn drop(
863        &mut self,
864        notifiers: &mut Vec<Notifier>,
865        partial_graph_manager: &mut PartialGraphManager,
866    ) -> bool {
867        match &mut self.status {
868            CreatingStreamingJobStatus::Resetting(existing_notifiers) => {
869                for notifier in &mut *notifiers {
870                    notifier.notify_started();
871                }
872                existing_notifiers.append(notifiers);
873                true
874            }
875            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
876            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
877                for notifier in &mut *notifiers {
878                    notifier.notify_started();
879                }
880                partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
881                self.status = CreatingStreamingJobStatus::Resetting(take(notifiers));
882                true
883            }
884            CreatingStreamingJobStatus::Finishing(_, _) => false,
885            CreatingStreamingJobStatus::PlaceHolder => {
886                unreachable!()
887            }
888        }
889    }
890
891    pub(crate) fn reset(self) -> bool {
892        match self.status {
893            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
894            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
895            | CreatingStreamingJobStatus::Finishing(_, _) => false,
896            CreatingStreamingJobStatus::Resetting(notifiers) => {
897                for notifier in notifiers {
898                    notifier.notify_collected();
899                }
900                true
901            }
902            CreatingStreamingJobStatus::PlaceHolder => {
903                unreachable!()
904            }
905        }
906    }
907}