risingwave_meta/barrier/checkpoint/independent_job/creating_job/
mod.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet, hash_map};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22use std::time::Duration;
23
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::WorkerId;
29use risingwave_pb::ddl_service::PbBackfillType;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_plan::{AddMutation, StopMutation};
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use status::CreatingStreamingJobStatus;
37use tracing::{debug, info};
38
39use super::super::state::RenderResult;
40use super::IndependentCheckpointJobControl;
41use crate::MetaResult;
42use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
43use crate::barrier::checkpoint::independent_job::creating_job::barrier_control::CreatingStreamingJobBarrierStats;
44use crate::barrier::checkpoint::independent_job::creating_job::status::CreateMviewLogStoreProgressTracker;
45use crate::barrier::command::PostCollectCommand;
46use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
47use crate::barrier::edge_builder::FragmentEdgeBuildResult;
48use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
49use crate::barrier::notifier::Notifier;
50use crate::barrier::partial_graph::{
51    CollectedBarrier, PartialGraphBarrierInfo, PartialGraphManager, PartialGraphRecoverer,
52};
53use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
54use crate::barrier::rpc::{build_locality_fragment_state_table_mapping, to_partial_graph_id};
55use crate::barrier::{
56    BackfillOrderState, BackfillProgress, BarrierKind, Command, FragmentBackfillProgress,
57    TracedEpoch,
58};
59use crate::controller::fragment::InflightFragmentInfo;
60use crate::model::{FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate};
61use crate::rpc::metrics::GLOBAL_META_METRICS;
62use crate::stream::source_manager::SplitAssignment;
63use crate::stream::{ExtendedFragmentBackfillOrder, build_actor_connector_splits};
64
65#[derive(Debug)]
66pub(crate) struct CreatingJobInfo {
67    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
68    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
69    pub downstreams: FragmentDownstreamRelation,
70    pub snapshot_backfill_upstream_tables: HashSet<TableId>,
71    pub stream_actors: HashMap<ActorId, StreamActor>,
72}
73
74#[derive(Debug)]
75pub(crate) struct CreatingStreamingJobControl {
76    job_id: JobId,
77    partial_graph_id: PartialGraphId,
78    snapshot_backfill_upstream_tables: HashSet<TableId>,
79    snapshot_epoch: u64,
80
81    node_actors: HashMap<WorkerId, HashSet<ActorId>>,
82    state_table_ids: HashSet<TableId>,
83
84    max_committed_epoch: Option<u64>,
85    status: CreatingStreamingJobStatus,
86
87    upstream_lag: LabelGuardedIntGauge,
88}
89
90impl CreatingStreamingJobControl {
91    pub(crate) fn new<'a>(
92        entry: hash_map::VacantEntry<'a, JobId, IndependentCheckpointJobControl>,
93        create_info: CreateSnapshotBackfillJobCommandInfo,
94        notifiers: Vec<Notifier>,
95        snapshot_backfill_upstream_tables: HashSet<TableId>,
96        snapshot_epoch: u64,
97        version_stat: &HummockVersionStats,
98        partial_graph_manager: &mut PartialGraphManager,
99        edges: &mut FragmentEdgeBuildResult,
100        split_assignment: &SplitAssignment,
101        actors: &RenderResult,
102    ) -> MetaResult<&'a mut Self> {
103        let info = create_info.info.clone();
104        let job_id = info.stream_job_fragments.stream_job_id();
105        let database_id = info.streaming_job.database_id();
106        debug!(
107            %job_id,
108            definition = info.definition,
109            "new creating job"
110        );
111        let fragment_infos = info
112            .stream_job_fragments
113            .new_fragment_info(
114                &actors.stream_actors,
115                &actors.actor_location,
116                split_assignment,
117            )
118            .collect();
119        let snapshot_backfill_actors =
120            InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
121        let backfill_nodes_to_pause =
122            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
123                .into_iter()
124                .collect();
125        let backfill_order_state = BackfillOrderState::new(
126            &info.fragment_backfill_ordering,
127            &fragment_infos,
128            info.locality_fragment_state_table_mapping.clone(),
129        );
130        let create_mview_tracker = CreateMviewProgressTracker::recover(
131            job_id,
132            &fragment_infos,
133            backfill_order_state,
134            version_stat,
135        );
136
137        let actors_to_create = Command::create_streaming_job_actors_to_create(
138            &info,
139            edges,
140            &actors.stream_actors,
141            &actors.actor_location,
142        );
143
144        let mut prev_epoch_fake_physical_time = 0;
145        let mut pending_non_checkpoint_barriers = vec![];
146
147        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
148            &mut prev_epoch_fake_physical_time,
149            &mut pending_non_checkpoint_barriers,
150            PbBarrierKind::Checkpoint,
151        );
152
153        let added_actors: Vec<ActorId> = actors
154            .stream_actors
155            .values()
156            .flatten()
157            .map(|actor| actor.actor_id)
158            .collect();
159        let actor_splits = split_assignment
160            .values()
161            .flat_map(build_actor_connector_splits)
162            .collect();
163
164        assert!(
165            info.cdc_table_snapshot_splits.is_none(),
166            "should not have cdc backfill for snapshot backfill job"
167        );
168
169        let initial_mutation = Mutation::Add(AddMutation {
170            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
171            actor_dispatchers: Default::default(),
172            added_actors,
173            actor_splits,
174            // we assume that when handling snapshot backfill, the cluster must not be paused
175            pause: false,
176            subscriptions_to_add: Default::default(),
177            backfill_nodes_to_pause,
178            actor_cdc_table_snapshot_splits: None,
179            new_upstream_sinks: Default::default(),
180        });
181
182        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
183        let state_table_ids =
184            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
185
186        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
187
188        let IndependentCheckpointJobControl::CreatingStreamingJob(job) = entry.insert(
189            IndependentCheckpointJobControl::CreatingStreamingJob(Self {
190                partial_graph_id,
191                job_id,
192                snapshot_backfill_upstream_tables,
193                max_committed_epoch: None,
194                snapshot_epoch,
195                status: CreatingStreamingJobStatus::PlaceHolder, // filled in later code
196                upstream_lag: GLOBAL_META_METRICS
197                    .snapshot_backfill_lag
198                    .with_guarded_label_values(&[&format!("{}", job_id)]),
199                node_actors,
200                state_table_ids,
201            }),
202        );
203
204        let mut graph_adder = partial_graph_manager.add_partial_graph(
205            partial_graph_id,
206            CreatingStreamingJobBarrierStats::new(job_id, snapshot_epoch),
207        );
208
209        if let Err(e) = Self::inject_barrier(
210            partial_graph_id,
211            graph_adder.manager(),
212            &job.node_actors,
213            &job.state_table_ids,
214            false,
215            initial_barrier_info,
216            Some(actors_to_create),
217            Some(initial_mutation),
218            notifiers,
219            Some(create_info),
220        ) {
221            graph_adder.failed();
222            job.status = CreatingStreamingJobStatus::Resetting(vec![]);
223            Err(e)
224        } else {
225            graph_adder.added();
226            assert!(pending_non_checkpoint_barriers.is_empty());
227            let job_info = CreatingJobInfo {
228                fragment_infos,
229                upstream_fragment_downstreams: info.upstream_fragment_downstreams.clone(),
230                downstreams: info.stream_job_fragments.downstreams,
231                snapshot_backfill_upstream_tables: job.snapshot_backfill_upstream_tables.clone(),
232                stream_actors: actors
233                    .stream_actors
234                    .values()
235                    .flatten()
236                    .map(|actor| (actor.actor_id, actor.clone()))
237                    .collect(),
238            };
239            job.status = CreatingStreamingJobStatus::ConsumingSnapshot {
240                prev_epoch_fake_physical_time,
241                pending_upstream_barriers: vec![],
242                version_stats: version_stat.clone(),
243                create_mview_tracker,
244                snapshot_backfill_actors,
245                snapshot_epoch,
246                info: job_info,
247                pending_non_checkpoint_barriers,
248            };
249            Ok(job)
250        }
251    }
252
253    pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
254        match &self.status {
255            CreatingStreamingJobStatus::ConsumingSnapshot {
256                create_mview_tracker,
257                info,
258                ..
259            } => create_mview_tracker.collect_fragment_progress(&info.fragment_infos, true),
260            CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
261                collect_done_fragments(self.job_id, &info.fragment_infos)
262            }
263            CreatingStreamingJobStatus::Finishing(_, _)
264            | CreatingStreamingJobStatus::Resetting(_)
265            | CreatingStreamingJobStatus::PlaceHolder => vec![],
266        }
267    }
268
269    fn resolve_upstream_log_epochs(
270        snapshot_backfill_upstream_tables: &HashSet<TableId>,
271        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
272        exclusive_start_log_epoch: u64,
273        upstream_barrier_info: &BarrierInfo,
274    ) -> MetaResult<Vec<BarrierInfo>> {
275        let table_id = snapshot_backfill_upstream_tables
276            .iter()
277            .next()
278            .expect("snapshot backfill job should have upstream");
279        let epochs_iter = if let Some(epochs) = upstream_table_log_epochs.get(table_id) {
280            let mut epochs_iter = epochs.iter();
281            loop {
282                let (_, checkpoint_epoch) =
283                    epochs_iter.next().expect("not reach committed epoch yet");
284                if *checkpoint_epoch < exclusive_start_log_epoch {
285                    continue;
286                }
287                assert_eq!(*checkpoint_epoch, exclusive_start_log_epoch);
288                break;
289            }
290            epochs_iter
291        } else {
292            // snapshot backfill job has been marked as creating, but upstream table has not committed a new epoch yet, so no table change log
293            assert_eq!(
294                upstream_barrier_info.prev_epoch(),
295                exclusive_start_log_epoch
296            );
297            static EMPTY_VEC: Vec<(Vec<u64>, u64)> = Vec::new();
298            EMPTY_VEC.iter()
299        };
300
301        let mut ret = vec![];
302        let mut prev_epoch = exclusive_start_log_epoch;
303        let mut pending_non_checkpoint_barriers = vec![];
304        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
305            for (i, epoch) in non_checkpoint_epochs
306                .iter()
307                .chain([checkpoint_epoch])
308                .enumerate()
309            {
310                assert!(*epoch > prev_epoch);
311                pending_non_checkpoint_barriers.push(prev_epoch);
312                ret.push(BarrierInfo {
313                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
314                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
315                    kind: if i == 0 {
316                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
317                    } else {
318                        BarrierKind::Barrier
319                    },
320                });
321                prev_epoch = *epoch;
322            }
323        }
324        ret.push(BarrierInfo {
325            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
326            curr_epoch: TracedEpoch::new(Epoch(upstream_barrier_info.curr_epoch())),
327            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
328        });
329        Ok(ret)
330    }
331
332    fn recover_consuming_snapshot(
333        job_id: JobId,
334        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
335        snapshot_epoch: u64,
336        committed_epoch: u64,
337        upstream_barrier_info: &BarrierInfo,
338        info: CreatingJobInfo,
339        backfill_order_state: BackfillOrderState,
340        version_stat: &HummockVersionStats,
341    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
342        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
343        let mut pending_non_checkpoint_barriers = vec![];
344        let create_mview_tracker = CreateMviewProgressTracker::recover(
345            job_id,
346            &info.fragment_infos,
347            backfill_order_state,
348            version_stat,
349        );
350        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
351            &mut prev_epoch_fake_physical_time,
352            &mut pending_non_checkpoint_barriers,
353            PbBarrierKind::Initial,
354        );
355
356        Ok((
357            CreatingStreamingJobStatus::ConsumingSnapshot {
358                prev_epoch_fake_physical_time,
359                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
360                    &info.snapshot_backfill_upstream_tables,
361                    upstream_table_log_epochs,
362                    snapshot_epoch,
363                    upstream_barrier_info,
364                )?,
365                version_stats: version_stat.clone(),
366                create_mview_tracker,
367                snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
368                    &info.fragment_infos,
369                )
370                .collect(),
371                info,
372                snapshot_epoch,
373                pending_non_checkpoint_barriers,
374            },
375            barrier_info,
376        ))
377    }
378
379    fn recover_consuming_log_store(
380        job_id: JobId,
381        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
382        committed_epoch: u64,
383        upstream_barrier_info: &BarrierInfo,
384        info: CreatingJobInfo,
385    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
386        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
387            &info.snapshot_backfill_upstream_tables,
388            upstream_table_log_epochs,
389            committed_epoch,
390            upstream_barrier_info,
391        )?;
392        let mut first_barrier = barriers_to_inject.remove(0);
393        assert!(first_barrier.kind.is_checkpoint());
394        first_barrier.kind = BarrierKind::Initial;
395
396        Ok((
397            CreatingStreamingJobStatus::ConsumingLogStore {
398                tracking_job: TrackingJob::recovered(job_id, &info.fragment_infos),
399                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
400                    InflightStreamingJobInfo::snapshot_backfill_actor_ids(&info.fragment_infos),
401                    barriers_to_inject
402                        .last()
403                        .map(|info| info.prev_epoch() - committed_epoch)
404                        .unwrap_or(0),
405                ),
406                barriers_to_inject: Some(barriers_to_inject),
407                info,
408            },
409            first_barrier,
410        ))
411    }
412
413    #[expect(clippy::too_many_arguments)]
414    pub(crate) fn recover(
415        database_id: DatabaseId,
416        job_id: JobId,
417        snapshot_backfill_upstream_tables: HashSet<TableId>,
418        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
419        snapshot_epoch: u64,
420        committed_epoch: u64,
421        upstream_barrier_info: &BarrierInfo,
422        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
423        backfill_order: ExtendedFragmentBackfillOrder,
424        fragment_relations: &FragmentDownstreamRelation,
425        version_stat: &HummockVersionStats,
426        new_actors: StreamJobActorsToCreate,
427        initial_mutation: Mutation,
428        partial_graph_recoverer: &mut PartialGraphRecoverer<'_>,
429    ) -> MetaResult<Self> {
430        info!(
431            %job_id,
432            "recovered creating snapshot backfill job"
433        );
434
435        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
436        let state_table_ids: HashSet<_> =
437            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
438
439        let mut upstream_fragment_downstreams: FragmentDownstreamRelation = Default::default();
440        for (upstream_fragment_id, downstreams) in fragment_relations {
441            if fragment_infos.contains_key(upstream_fragment_id) {
442                continue;
443            }
444            for downstream in downstreams {
445                if fragment_infos.contains_key(&downstream.downstream_fragment_id) {
446                    upstream_fragment_downstreams
447                        .entry(*upstream_fragment_id)
448                        .or_default()
449                        .push(downstream.clone());
450                }
451            }
452        }
453        let downstreams = fragment_infos
454            .keys()
455            .filter_map(|fragment_id| {
456                fragment_relations
457                    .get(fragment_id)
458                    .map(|relation| (*fragment_id, relation.clone()))
459            })
460            .collect();
461
462        let info = CreatingJobInfo {
463            fragment_infos,
464            upstream_fragment_downstreams,
465            downstreams,
466            snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
467            stream_actors: new_actors
468                .values()
469                .flat_map(|fragments| {
470                    fragments.values().flat_map(|(_, actors, _)| {
471                        actors
472                            .iter()
473                            .map(|(actor, _, _)| (actor.actor_id, actor.clone()))
474                    })
475                })
476                .collect(),
477        };
478
479        let (status, first_barrier_info) = if committed_epoch < snapshot_epoch {
480            let locality_fragment_state_table_mapping =
481                build_locality_fragment_state_table_mapping(&info.fragment_infos);
482            let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
483                &backfill_order,
484                &info.fragment_infos,
485                locality_fragment_state_table_mapping,
486            );
487            Self::recover_consuming_snapshot(
488                job_id,
489                upstream_table_log_epochs,
490                snapshot_epoch,
491                committed_epoch,
492                upstream_barrier_info,
493                info,
494                backfill_order_state,
495                version_stat,
496            )?
497        } else {
498            Self::recover_consuming_log_store(
499                job_id,
500                upstream_table_log_epochs,
501                committed_epoch,
502                upstream_barrier_info,
503                info,
504            )?
505        };
506
507        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
508
509        partial_graph_recoverer.recover_graph(
510            partial_graph_id,
511            initial_mutation,
512            &first_barrier_info,
513            &node_actors,
514            state_table_ids.iter().copied(),
515            new_actors,
516            CreatingStreamingJobBarrierStats::new(job_id, snapshot_epoch),
517        )?;
518
519        Ok(Self {
520            job_id,
521            partial_graph_id,
522            snapshot_backfill_upstream_tables,
523            snapshot_epoch,
524            node_actors,
525            state_table_ids,
526            max_committed_epoch: Some(committed_epoch),
527            status,
528            upstream_lag: GLOBAL_META_METRICS
529                .snapshot_backfill_lag
530                .with_guarded_label_values(&[&format!("{}", job_id)]),
531        })
532    }
533
534    pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
535        let progress = match &self.status {
536            CreatingStreamingJobStatus::ConsumingSnapshot {
537                create_mview_tracker,
538                ..
539            } => {
540                if create_mview_tracker.is_finished() {
541                    "Snapshot finished".to_owned()
542                } else {
543                    let progress = create_mview_tracker.gen_backfill_progress();
544                    format!("Snapshot [{}]", progress)
545                }
546            }
547            CreatingStreamingJobStatus::ConsumingLogStore {
548                log_store_progress_tracker,
549                ..
550            } => {
551                format!(
552                    "LogStore [{}]",
553                    log_store_progress_tracker.gen_backfill_progress()
554                )
555            }
556            CreatingStreamingJobStatus::Finishing(finish_epoch, ..) => {
557                let committed_epoch = self.max_committed_epoch.expect("should have committed");
558                let lag = Duration::from_millis(
559                    Epoch(*finish_epoch).physical_time() - Epoch(committed_epoch).physical_time(),
560                );
561                format!("Finishing [epoch lag: {lag:?}]",)
562            }
563            CreatingStreamingJobStatus::Resetting(_) => "Resetting".to_owned(),
564            CreatingStreamingJobStatus::PlaceHolder => {
565                unreachable!()
566            }
567        };
568        BackfillProgress {
569            progress,
570            backfill_type: PbBackfillType::SnapshotBackfill,
571        }
572    }
573
574    pub(super) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
575        (
576            max(self.max_committed_epoch.unwrap_or(0), self.snapshot_epoch),
577            self.snapshot_backfill_upstream_tables.clone(),
578        )
579    }
580
581    fn inject_barrier(
582        partial_graph_id: PartialGraphId,
583        partial_graph_manager: &mut PartialGraphManager,
584        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
585        state_table_ids: &HashSet<TableId>,
586        is_finishing: bool,
587        barrier_info: BarrierInfo,
588        new_actors: Option<StreamJobActorsToCreate>,
589        mutation: Option<Mutation>,
590        notifiers: Vec<Notifier>,
591        first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
592    ) -> MetaResult<()> {
593        let (table_ids_to_sync, nodes_to_sync_table) = if !is_finishing {
594            (Some(state_table_ids), Some(node_actors.keys().copied()))
595        } else {
596            (None, None)
597        };
598        partial_graph_manager.inject_barrier(
599            partial_graph_id,
600            mutation,
601            node_actors,
602            table_ids_to_sync.into_iter().flatten().copied(),
603            nodes_to_sync_table.into_iter().flatten(),
604            new_actors,
605            PartialGraphBarrierInfo::new(
606                first_create_info.map_or_else(
607                    PostCollectCommand::barrier,
608                    CreateSnapshotBackfillJobCommandInfo::into_post_collect,
609                ),
610                barrier_info,
611                notifiers,
612                state_table_ids.clone(),
613            ),
614        )?;
615        Ok(())
616    }
617
618    pub(crate) fn start_consume_upstream(
619        &mut self,
620        partial_graph_manager: &mut PartialGraphManager,
621        barrier_info: &BarrierInfo,
622    ) -> MetaResult<CreatingJobInfo> {
623        info!(
624            job_id = %self.job_id,
625            prev_epoch = barrier_info.prev_epoch(),
626            "start consuming upstream"
627        );
628        let info = self.status.start_consume_upstream(barrier_info);
629        Self::inject_barrier(
630            self.partial_graph_id,
631            partial_graph_manager,
632            &self.node_actors,
633            &self.state_table_ids,
634            true,
635            barrier_info.clone(),
636            None,
637            Some(Mutation::Stop(StopMutation {
638                // stop all actors
639                actors: info
640                    .fragment_infos
641                    .values()
642                    .flat_map(|info| info.actors.keys().copied())
643                    .collect(),
644                dropped_sink_fragments: vec![], // not related to sink-into-table
645            })),
646            vec![], // no notifiers when start consuming upstream
647            None,
648        )?;
649        Ok(info)
650    }
651
652    pub(crate) fn on_new_upstream_barrier(
653        &mut self,
654        partial_graph_manager: &mut PartialGraphManager,
655        barrier_info: &BarrierInfo,
656        mutation: Option<(Mutation, Vec<Notifier>)>,
657    ) -> MetaResult<()> {
658        let progress_epoch = if let Some(max_committed_epoch) = self.max_committed_epoch {
659            max(max_committed_epoch, self.snapshot_epoch)
660        } else {
661            self.snapshot_epoch
662        };
663        self.upstream_lag.set(
664            barrier_info
665                .prev_epoch
666                .value()
667                .0
668                .saturating_sub(progress_epoch) as _,
669        );
670        let (mut mutation, mut notifiers) = match mutation {
671            Some((mutation, notifiers)) => (Some(mutation), notifiers),
672            None => (None, vec![]),
673        };
674        {
675            for (barrier_to_inject, mutation) in self
676                .status
677                .on_new_upstream_epoch(barrier_info, mutation.take())
678            {
679                Self::inject_barrier(
680                    self.partial_graph_id,
681                    partial_graph_manager,
682                    &self.node_actors,
683                    &self.state_table_ids,
684                    false,
685                    barrier_to_inject,
686                    None,
687                    mutation,
688                    take(&mut notifiers),
689                    None,
690                )?;
691            }
692            assert!(mutation.is_none(), "must have consumed mutation");
693            assert!(notifiers.is_empty(), "must consumed notifiers");
694        }
695        Ok(())
696    }
697
698    pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
699        self.status.update_progress(
700            collected_barrier
701                .resps
702                .values()
703                .flat_map(|resp| &resp.create_mview_progress),
704        );
705        self.should_merge_to_upstream()
706    }
707
708    pub(crate) fn should_merge_to_upstream(&self) -> bool {
709        if let CreatingStreamingJobStatus::ConsumingLogStore {
710            log_store_progress_tracker,
711            barriers_to_inject,
712            ..
713        } = &self.status
714            && barriers_to_inject.is_none()
715            && log_store_progress_tracker.is_finished()
716        {
717            true
718        } else {
719            false
720        }
721    }
722}
723
724impl CreatingStreamingJobControl {
725    pub(crate) fn start_completing(
726        &mut self,
727        partial_graph_manager: &mut PartialGraphManager,
728        min_upstream_inflight_epoch: Option<u64>,
729        upstream_committed_epoch: u64,
730    ) -> Option<(
731        u64,
732        HashMap<WorkerId, BarrierCompleteResponse>,
733        PartialGraphBarrierInfo,
734        bool,
735    )> {
736        // do not commit snapshot backfill job until upstream has committed the snapshot epoch
737        if upstream_committed_epoch < self.snapshot_epoch {
738            return None;
739        }
740        let (finished_at_epoch, epoch_end_bound) = match &self.status {
741            CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
742                let epoch_end_bound = min_upstream_inflight_epoch
743                    .map(|upstream_epoch| {
744                        if upstream_epoch < *finish_at_epoch {
745                            Excluded(upstream_epoch)
746                        } else {
747                            Unbounded
748                        }
749                    })
750                    .unwrap_or(Unbounded);
751                (Some(*finish_at_epoch), epoch_end_bound)
752            }
753            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
754            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
755                None,
756                min_upstream_inflight_epoch
757                    .map(Excluded)
758                    .unwrap_or(Unbounded),
759            ),
760            CreatingStreamingJobStatus::Resetting(..) => {
761                return None;
762            }
763            CreatingStreamingJobStatus::PlaceHolder => {
764                unreachable!()
765            }
766        };
767        partial_graph_manager
768            .start_completing(
769                self.partial_graph_id,
770                epoch_end_bound,
771                |non_checkpoint_epoch, _, _| {
772                    if let Some(finish_at_epoch) = finished_at_epoch {
773                        assert!(non_checkpoint_epoch.prev < finish_at_epoch);
774                    }
775                },
776            )
777            .map(|(epoch, resps, info)| {
778                let is_finish_epoch = if let Some(finish_at_epoch) = finished_at_epoch {
779                    assert!(!info.post_collect_command.should_checkpoint());
780                    if epoch == finish_at_epoch {
781                        // TODO: can early remove partial graph here
782                        self.ack_completed(partial_graph_manager, epoch);
783                        true
784                    } else {
785                        false
786                    }
787                } else {
788                    false
789                };
790                (epoch, resps, info, is_finish_epoch)
791            })
792    }
793
794    pub(super) fn ack_completed(
795        &mut self,
796        partial_graph_manager: &mut PartialGraphManager,
797        completed_epoch: u64,
798    ) {
799        match &self.status {
800            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
801            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
802            | CreatingStreamingJobStatus::Finishing(_, _) => {
803                partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
804                if let Some(prev_max_committed_epoch) =
805                    self.max_committed_epoch.replace(completed_epoch)
806                {
807                    assert!(completed_epoch > prev_max_committed_epoch);
808                }
809            }
810            CreatingStreamingJobStatus::Resetting(_) => {
811                // The job was dropped while the completing task was running in the background.
812                // The partial graph has already been reset, so skip the ack.
813            }
814            CreatingStreamingJobStatus::PlaceHolder => {
815                unreachable!()
816            }
817        }
818    }
819
820    pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
821        self.status.fragment_infos()
822    }
823
824    pub fn into_tracking_job(self) -> TrackingJob {
825        match self.status {
826            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
827            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
828            | CreatingStreamingJobStatus::Resetting(..)
829            | CreatingStreamingJobStatus::PlaceHolder => {
830                unreachable!("expect finish")
831            }
832            CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
833        }
834    }
835
836    pub(super) fn on_partial_graph_reset(mut self) {
837        match &mut self.status {
838            CreatingStreamingJobStatus::Resetting(notifiers) => {
839                for notifier in notifiers.drain(..) {
840                    notifier.notify_collected();
841                }
842            }
843            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
844            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
845            | CreatingStreamingJobStatus::Finishing(_, _) => {
846                panic!(
847                    "should be resetting when receiving reset partial graph resp, but at {:?}",
848                    self.status
849                )
850            }
851            CreatingStreamingJobStatus::PlaceHolder => {
852                unreachable!()
853            }
854        }
855    }
856
857    /// Drop a creating snapshot backfill job by directly resetting the partial graph
858    /// Return `false` if the partial graph has been merged to upstream database, and `true` otherwise
859    /// to mean that the job has been dropped.
860    pub(super) fn drop(
861        &mut self,
862        notifiers: &mut Vec<Notifier>,
863        partial_graph_manager: &mut PartialGraphManager,
864    ) -> bool {
865        match &mut self.status {
866            CreatingStreamingJobStatus::Resetting(existing_notifiers) => {
867                for notifier in &mut *notifiers {
868                    notifier.notify_started();
869                }
870                existing_notifiers.append(notifiers);
871                true
872            }
873            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
874            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
875                for notifier in &mut *notifiers {
876                    notifier.notify_started();
877                }
878                partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
879                self.status = CreatingStreamingJobStatus::Resetting(take(notifiers));
880                true
881            }
882            CreatingStreamingJobStatus::Finishing(_, _) => false,
883            CreatingStreamingJobStatus::PlaceHolder => {
884                unreachable!()
885            }
886        }
887    }
888
889    pub(crate) fn reset(self) -> bool {
890        match self.status {
891            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
892            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
893            | CreatingStreamingJobStatus::Finishing(_, _) => false,
894            CreatingStreamingJobStatus::Resetting(notifiers) => {
895                for notifier in notifiers {
896                    notifier.notify_collected();
897                }
898                true
899            }
900            CreatingStreamingJobStatus::PlaceHolder => {
901                unreachable!()
902            }
903        }
904    }
905}