risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet, hash_map};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_meta_model::{DispatcherType, WorkerId};
30use risingwave_pb::ddl_service::PbBackfillType;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
33use risingwave_pb::stream_plan::barrier::PbBarrierKind;
34use risingwave_pb::stream_plan::barrier_mutation::Mutation;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_plan::{AddMutation, StopMutation};
37use risingwave_pb::stream_service::BarrierCompleteResponse;
38use status::CreatingStreamingJobStatus;
39use tracing::{debug, info};
40
41use super::state::RenderResult;
42use crate::MetaResult;
43use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
44use crate::barrier::checkpoint::creating_job::barrier_control::CreatingStreamingJobBarrierStats;
45use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
46use crate::barrier::command::PostCollectCommand;
47use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
48use crate::barrier::edge_builder::FragmentEdgeBuildResult;
49use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
50use crate::barrier::notifier::Notifier;
51use crate::barrier::partial_graph::{
52    CollectedBarrier, PartialGraphBarrierInfo, PartialGraphManager, PartialGraphRecoverer,
53};
54use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
55use crate::barrier::rpc::{build_locality_fragment_state_table_mapping, to_partial_graph_id};
56use crate::barrier::{
57    BackfillOrderState, BackfillProgress, BarrierKind, Command, FragmentBackfillProgress,
58    TracedEpoch,
59};
60use crate::controller::fragment::InflightFragmentInfo;
61use crate::model::{FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate};
62use crate::rpc::metrics::GLOBAL_META_METRICS;
63use crate::stream::source_manager::SplitAssignment;
64use crate::stream::{ExtendedFragmentBackfillOrder, build_actor_connector_splits};
65
66#[derive(Debug)]
67pub(crate) struct CreatingJobInfo {
68    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
69    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
70    pub downstreams: FragmentDownstreamRelation,
71    pub snapshot_backfill_upstream_tables: HashSet<TableId>,
72    pub stream_actors: HashMap<ActorId, StreamActor>,
73}
74
75#[derive(Debug)]
76pub(crate) struct CreatingStreamingJobControl {
77    job_id: JobId,
78    partial_graph_id: PartialGraphId,
79    snapshot_backfill_upstream_tables: HashSet<TableId>,
80    snapshot_epoch: u64,
81
82    node_actors: HashMap<WorkerId, HashSet<ActorId>>,
83    state_table_ids: HashSet<TableId>,
84
85    barrier_control: CreatingStreamingJobBarrierControl,
86    status: CreatingStreamingJobStatus,
87
88    upstream_lag: LabelGuardedIntGauge,
89}
90
91fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
92    let mut has_unreschedulable_scan = false;
93    visit_stream_node_cont(&fragment.nodes, |node| {
94        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
95            let scan_type = stream_scan.stream_scan_type();
96            if !scan_type.is_reschedulable(true) {
97                has_unreschedulable_scan = true;
98                return false;
99            }
100        }
101        true
102    });
103    has_unreschedulable_scan
104}
105
106fn collect_fragment_upstream_fragment_ids(
107    fragment: &InflightFragmentInfo,
108    upstream_fragment_ids: &mut HashSet<FragmentId>,
109) {
110    visit_stream_node_cont(&fragment.nodes, |node| {
111        if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
112            upstream_fragment_ids.insert(merge.upstream_fragment_id);
113        }
114        true
115    });
116}
117
118impl CreatingStreamingJobControl {
119    pub(super) fn new<'a>(
120        entry: hash_map::VacantEntry<'a, JobId, Self>,
121        create_info: CreateSnapshotBackfillJobCommandInfo,
122        notifiers: Vec<Notifier>,
123        snapshot_backfill_upstream_tables: HashSet<TableId>,
124        snapshot_epoch: u64,
125        version_stat: &HummockVersionStats,
126        partial_graph_manager: &mut PartialGraphManager,
127        edges: &mut FragmentEdgeBuildResult,
128        split_assignment: &SplitAssignment,
129        actors: &RenderResult,
130    ) -> MetaResult<&'a mut Self> {
131        let info = create_info.info.clone();
132        let job_id = info.stream_job_fragments.stream_job_id();
133        let database_id = info.streaming_job.database_id();
134        debug!(
135            %job_id,
136            definition = info.definition,
137            "new creating job"
138        );
139        let fragment_infos = info
140            .stream_job_fragments
141            .new_fragment_info(
142                &actors.stream_actors,
143                &actors.actor_location,
144                split_assignment,
145            )
146            .collect();
147        let snapshot_backfill_actors =
148            InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
149        let backfill_nodes_to_pause =
150            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
151                .into_iter()
152                .collect();
153        let backfill_order_state = BackfillOrderState::new(
154            &info.fragment_backfill_ordering,
155            &fragment_infos,
156            info.locality_fragment_state_table_mapping.clone(),
157        );
158        let create_mview_tracker = CreateMviewProgressTracker::recover(
159            job_id,
160            &fragment_infos,
161            backfill_order_state,
162            version_stat,
163        );
164
165        let actors_to_create = Command::create_streaming_job_actors_to_create(
166            &info,
167            edges,
168            &actors.stream_actors,
169            &actors.actor_location,
170        );
171
172        let barrier_control = CreatingStreamingJobBarrierControl::new(job_id, None);
173
174        let mut prev_epoch_fake_physical_time = 0;
175        let mut pending_non_checkpoint_barriers = vec![];
176
177        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
178            &mut prev_epoch_fake_physical_time,
179            &mut pending_non_checkpoint_barriers,
180            PbBarrierKind::Checkpoint,
181        );
182
183        let added_actors: Vec<ActorId> = actors
184            .stream_actors
185            .values()
186            .flatten()
187            .map(|actor| actor.actor_id)
188            .collect();
189        let actor_splits = split_assignment
190            .values()
191            .flat_map(build_actor_connector_splits)
192            .collect();
193
194        assert!(
195            info.cdc_table_snapshot_splits.is_none(),
196            "should not have cdc backfill for snapshot backfill job"
197        );
198
199        let initial_mutation = Mutation::Add(AddMutation {
200            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
201            actor_dispatchers: Default::default(),
202            added_actors,
203            actor_splits,
204            // we assume that when handling snapshot backfill, the cluster must not be paused
205            pause: false,
206            subscriptions_to_add: Default::default(),
207            backfill_nodes_to_pause,
208            actor_cdc_table_snapshot_splits: None,
209            new_upstream_sinks: Default::default(),
210        });
211
212        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
213        let state_table_ids =
214            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
215
216        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
217
218        let job = entry.insert(Self {
219            partial_graph_id,
220            job_id,
221            snapshot_backfill_upstream_tables,
222            barrier_control,
223            snapshot_epoch,
224            status: CreatingStreamingJobStatus::PlaceHolder, // filled in later code
225            upstream_lag: GLOBAL_META_METRICS
226                .snapshot_backfill_lag
227                .with_guarded_label_values(&[&format!("{}", job_id)]),
228            node_actors,
229            state_table_ids,
230        });
231
232        let mut graph_adder = partial_graph_manager.add_partial_graph(
233            partial_graph_id,
234            CreatingStreamingJobBarrierStats::new(job_id, snapshot_epoch),
235        );
236
237        if let Err(e) = Self::inject_barrier(
238            partial_graph_id,
239            graph_adder.manager(),
240            &mut job.barrier_control,
241            &job.node_actors,
242            &job.state_table_ids,
243            false,
244            initial_barrier_info,
245            Some(actors_to_create),
246            Some(initial_mutation),
247            notifiers,
248            Some(create_info),
249        ) {
250            graph_adder.failed();
251            job.status = CreatingStreamingJobStatus::Resetting(vec![]);
252            Err(e)
253        } else {
254            graph_adder.added();
255            assert!(pending_non_checkpoint_barriers.is_empty());
256            let job_info = CreatingJobInfo {
257                fragment_infos,
258                upstream_fragment_downstreams: info.upstream_fragment_downstreams.clone(),
259                downstreams: info.stream_job_fragments.downstreams,
260                snapshot_backfill_upstream_tables: job.snapshot_backfill_upstream_tables.clone(),
261                stream_actors: actors
262                    .stream_actors
263                    .values()
264                    .flatten()
265                    .map(|actor| (actor.actor_id, actor.clone()))
266                    .collect(),
267            };
268            job.status = CreatingStreamingJobStatus::ConsumingSnapshot {
269                prev_epoch_fake_physical_time,
270                pending_upstream_barriers: vec![],
271                version_stats: version_stat.clone(),
272                create_mview_tracker,
273                snapshot_backfill_actors,
274                snapshot_epoch,
275                info: job_info,
276                pending_non_checkpoint_barriers,
277            };
278            Ok(job)
279        }
280    }
281
282    pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
283        match &self.status {
284            CreatingStreamingJobStatus::ConsumingSnapshot {
285                create_mview_tracker,
286                info,
287                ..
288            } => create_mview_tracker.collect_fragment_progress(&info.fragment_infos, true),
289            CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
290                collect_done_fragments(self.job_id, &info.fragment_infos)
291            }
292            CreatingStreamingJobStatus::Finishing(_, _)
293            | CreatingStreamingJobStatus::Resetting(_)
294            | CreatingStreamingJobStatus::PlaceHolder => vec![],
295        }
296    }
297
298    fn resolve_upstream_log_epochs(
299        snapshot_backfill_upstream_tables: &HashSet<TableId>,
300        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
301        exclusive_start_log_epoch: u64,
302        upstream_barrier_info: &BarrierInfo,
303    ) -> MetaResult<Vec<BarrierInfo>> {
304        let table_id = snapshot_backfill_upstream_tables
305            .iter()
306            .next()
307            .expect("snapshot backfill job should have upstream");
308        let epochs_iter = if let Some(epochs) = upstream_table_log_epochs.get(table_id) {
309            let mut epochs_iter = epochs.iter();
310            loop {
311                let (_, checkpoint_epoch) =
312                    epochs_iter.next().expect("not reach committed epoch yet");
313                if *checkpoint_epoch < exclusive_start_log_epoch {
314                    continue;
315                }
316                assert_eq!(*checkpoint_epoch, exclusive_start_log_epoch);
317                break;
318            }
319            epochs_iter
320        } else {
321            // snapshot backfill job has been marked as creating, but upstream table has not committed a new epoch yet, so no table change log
322            assert_eq!(
323                upstream_barrier_info.prev_epoch(),
324                exclusive_start_log_epoch
325            );
326            static EMPTY_VEC: Vec<(Vec<u64>, u64)> = Vec::new();
327            EMPTY_VEC.iter()
328        };
329
330        let mut ret = vec![];
331        let mut prev_epoch = exclusive_start_log_epoch;
332        let mut pending_non_checkpoint_barriers = vec![];
333        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
334            for (i, epoch) in non_checkpoint_epochs
335                .iter()
336                .chain([checkpoint_epoch])
337                .enumerate()
338            {
339                assert!(*epoch > prev_epoch);
340                pending_non_checkpoint_barriers.push(prev_epoch);
341                ret.push(BarrierInfo {
342                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
343                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
344                    kind: if i == 0 {
345                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
346                    } else {
347                        BarrierKind::Barrier
348                    },
349                });
350                prev_epoch = *epoch;
351            }
352        }
353        ret.push(BarrierInfo {
354            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
355            curr_epoch: TracedEpoch::new(Epoch(upstream_barrier_info.curr_epoch())),
356            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
357        });
358        Ok(ret)
359    }
360
361    fn recover_consuming_snapshot(
362        job_id: JobId,
363        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
364        snapshot_epoch: u64,
365        committed_epoch: u64,
366        upstream_barrier_info: &BarrierInfo,
367        info: CreatingJobInfo,
368        backfill_order_state: BackfillOrderState,
369        version_stat: &HummockVersionStats,
370    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
371        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
372        let mut pending_non_checkpoint_barriers = vec![];
373        let create_mview_tracker = CreateMviewProgressTracker::recover(
374            job_id,
375            &info.fragment_infos,
376            backfill_order_state,
377            version_stat,
378        );
379        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
380            &mut prev_epoch_fake_physical_time,
381            &mut pending_non_checkpoint_barriers,
382            PbBarrierKind::Initial,
383        );
384
385        Ok((
386            CreatingStreamingJobStatus::ConsumingSnapshot {
387                prev_epoch_fake_physical_time,
388                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
389                    &info.snapshot_backfill_upstream_tables,
390                    upstream_table_log_epochs,
391                    snapshot_epoch,
392                    upstream_barrier_info,
393                )?,
394                version_stats: version_stat.clone(),
395                create_mview_tracker,
396                snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
397                    &info.fragment_infos,
398                )
399                .collect(),
400                info,
401                snapshot_epoch,
402                pending_non_checkpoint_barriers,
403            },
404            barrier_info,
405        ))
406    }
407
408    fn recover_consuming_log_store(
409        job_id: JobId,
410        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
411        committed_epoch: u64,
412        upstream_barrier_info: &BarrierInfo,
413        info: CreatingJobInfo,
414    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
415        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
416            &info.snapshot_backfill_upstream_tables,
417            upstream_table_log_epochs,
418            committed_epoch,
419            upstream_barrier_info,
420        )?;
421        let mut first_barrier = barriers_to_inject.remove(0);
422        assert!(first_barrier.kind.is_checkpoint());
423        first_barrier.kind = BarrierKind::Initial;
424
425        Ok((
426            CreatingStreamingJobStatus::ConsumingLogStore {
427                tracking_job: TrackingJob::recovered(job_id, &info.fragment_infos),
428                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
429                    InflightStreamingJobInfo::snapshot_backfill_actor_ids(&info.fragment_infos),
430                    barriers_to_inject
431                        .last()
432                        .map(|info| info.prev_epoch() - committed_epoch)
433                        .unwrap_or(0),
434                ),
435                barriers_to_inject: Some(barriers_to_inject),
436                info,
437            },
438            first_barrier,
439        ))
440    }
441
442    #[expect(clippy::too_many_arguments)]
443    pub(crate) fn recover(
444        database_id: DatabaseId,
445        job_id: JobId,
446        snapshot_backfill_upstream_tables: HashSet<TableId>,
447        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
448        snapshot_epoch: u64,
449        committed_epoch: u64,
450        upstream_barrier_info: &BarrierInfo,
451        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
452        backfill_order: ExtendedFragmentBackfillOrder,
453        fragment_relations: &FragmentDownstreamRelation,
454        version_stat: &HummockVersionStats,
455        new_actors: StreamJobActorsToCreate,
456        initial_mutation: Mutation,
457        partial_graph_recoverer: &mut PartialGraphRecoverer<'_>,
458    ) -> MetaResult<Self> {
459        info!(
460            %job_id,
461            "recovered creating snapshot backfill job"
462        );
463        let barrier_control =
464            CreatingStreamingJobBarrierControl::new(job_id, Some(committed_epoch));
465
466        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
467        let state_table_ids: HashSet<_> =
468            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
469
470        let mut upstream_fragment_downstreams: FragmentDownstreamRelation = Default::default();
471        for (upstream_fragment_id, downstreams) in fragment_relations {
472            if fragment_infos.contains_key(upstream_fragment_id) {
473                continue;
474            }
475            for downstream in downstreams {
476                if fragment_infos.contains_key(&downstream.downstream_fragment_id) {
477                    upstream_fragment_downstreams
478                        .entry(*upstream_fragment_id)
479                        .or_default()
480                        .push(downstream.clone());
481                }
482            }
483        }
484        let downstreams = fragment_infos
485            .keys()
486            .filter_map(|fragment_id| {
487                fragment_relations
488                    .get(fragment_id)
489                    .map(|relation| (*fragment_id, relation.clone()))
490            })
491            .collect();
492
493        let info = CreatingJobInfo {
494            fragment_infos,
495            upstream_fragment_downstreams,
496            downstreams,
497            snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
498            stream_actors: new_actors
499                .values()
500                .flat_map(|fragments| {
501                    fragments.values().flat_map(|(_, actors, _)| {
502                        actors
503                            .iter()
504                            .map(|(actor, _, _)| (actor.actor_id, actor.clone()))
505                    })
506                })
507                .collect(),
508        };
509
510        let (status, first_barrier_info) = if committed_epoch < snapshot_epoch {
511            let locality_fragment_state_table_mapping =
512                build_locality_fragment_state_table_mapping(&info.fragment_infos);
513            let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
514                &backfill_order,
515                &info.fragment_infos,
516                locality_fragment_state_table_mapping,
517            );
518            Self::recover_consuming_snapshot(
519                job_id,
520                upstream_table_log_epochs,
521                snapshot_epoch,
522                committed_epoch,
523                upstream_barrier_info,
524                info,
525                backfill_order_state,
526                version_stat,
527            )?
528        } else {
529            Self::recover_consuming_log_store(
530                job_id,
531                upstream_table_log_epochs,
532                committed_epoch,
533                upstream_barrier_info,
534                info,
535            )?
536        };
537
538        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
539
540        partial_graph_recoverer.recover_graph(
541            partial_graph_id,
542            initial_mutation,
543            &first_barrier_info,
544            &node_actors,
545            state_table_ids.iter().copied(),
546            new_actors,
547            CreatingStreamingJobBarrierStats::new(job_id, snapshot_epoch),
548        )?;
549
550        Ok(Self {
551            job_id,
552            partial_graph_id,
553            snapshot_backfill_upstream_tables,
554            snapshot_epoch,
555            node_actors,
556            state_table_ids,
557            barrier_control,
558            status,
559            upstream_lag: GLOBAL_META_METRICS
560                .snapshot_backfill_lag
561                .with_guarded_label_values(&[&format!("{}", job_id)]),
562        })
563    }
564
565    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
566        self.status
567            .fragment_infos()
568            .map(|fragment_infos| {
569                !InflightFragmentInfo::contains_worker(fragment_infos.values(), worker_id)
570            })
571            .unwrap_or(true)
572    }
573
574    pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
575        let progress = match &self.status {
576            CreatingStreamingJobStatus::ConsumingSnapshot {
577                create_mview_tracker,
578                ..
579            } => {
580                if create_mview_tracker.is_finished() {
581                    "Snapshot finished".to_owned()
582                } else {
583                    let progress = create_mview_tracker.gen_backfill_progress();
584                    format!("Snapshot [{}]", progress)
585                }
586            }
587            CreatingStreamingJobStatus::ConsumingLogStore {
588                log_store_progress_tracker,
589                ..
590            } => {
591                format!(
592                    "LogStore [{}]",
593                    log_store_progress_tracker.gen_backfill_progress()
594                )
595            }
596            CreatingStreamingJobStatus::Finishing(..) => {
597                format!(
598                    "Finishing [epoch count: {}]",
599                    self.barrier_control.inflight_barrier_count()
600                )
601            }
602            CreatingStreamingJobStatus::Resetting(_) => "Resetting".to_owned(),
603            CreatingStreamingJobStatus::PlaceHolder => {
604                unreachable!()
605            }
606        };
607        BackfillProgress {
608            progress,
609            backfill_type: PbBackfillType::SnapshotBackfill,
610        }
611    }
612
613    pub(super) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
614        (
615            max(
616                self.barrier_control.max_committed_epoch().unwrap_or(0),
617                self.snapshot_epoch,
618            ),
619            self.snapshot_backfill_upstream_tables.clone(),
620        )
621    }
622
623    #[expect(clippy::too_many_arguments)]
624    fn inject_barrier(
625        partial_graph_id: PartialGraphId,
626        partial_graph_manager: &mut PartialGraphManager,
627        barrier_control: &mut CreatingStreamingJobBarrierControl,
628        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
629        state_table_ids: &HashSet<TableId>,
630        is_finishing: bool,
631        barrier_info: BarrierInfo,
632        new_actors: Option<StreamJobActorsToCreate>,
633        mutation: Option<Mutation>,
634        notifiers: Vec<Notifier>,
635        first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
636    ) -> MetaResult<()> {
637        let (table_ids_to_sync, nodes_to_sync_table) = if !is_finishing {
638            (Some(state_table_ids), Some(node_actors.keys().copied()))
639        } else {
640            (None, None)
641        };
642        let prev_epoch = barrier_info.prev_epoch();
643        partial_graph_manager.inject_barrier(
644            partial_graph_id,
645            mutation,
646            node_actors,
647            table_ids_to_sync.into_iter().flatten().copied(),
648            nodes_to_sync_table.into_iter().flatten(),
649            new_actors,
650            PartialGraphBarrierInfo::new(
651                first_create_info.map_or_else(
652                    PostCollectCommand::barrier,
653                    CreateSnapshotBackfillJobCommandInfo::into_post_collect,
654                ),
655                barrier_info,
656                notifiers,
657                state_table_ids.clone(),
658            ),
659        )?;
660        barrier_control.enqueue_epoch(prev_epoch);
661        Ok(())
662    }
663
664    pub(super) fn start_consume_upstream(
665        &mut self,
666        partial_graph_manager: &mut PartialGraphManager,
667        barrier_info: &BarrierInfo,
668    ) -> MetaResult<CreatingJobInfo> {
669        info!(
670            job_id = %self.job_id,
671            prev_epoch = barrier_info.prev_epoch(),
672            "start consuming upstream"
673        );
674        let info = self.status.start_consume_upstream(barrier_info);
675        Self::inject_barrier(
676            self.partial_graph_id,
677            partial_graph_manager,
678            &mut self.barrier_control,
679            &self.node_actors,
680            &self.state_table_ids,
681            true,
682            barrier_info.clone(),
683            None,
684            Some(Mutation::Stop(StopMutation {
685                // stop all actors
686                actors: info
687                    .fragment_infos
688                    .values()
689                    .flat_map(|info| info.actors.keys().copied())
690                    .collect(),
691                dropped_sink_fragments: vec![], // not related to sink-into-table
692            })),
693            vec![], // no notifiers when start consuming upstream
694            None,
695        )?;
696        Ok(info)
697    }
698
699    pub(super) fn on_new_upstream_barrier(
700        &mut self,
701        partial_graph_manager: &mut PartialGraphManager,
702        barrier_info: &BarrierInfo,
703        mutation: Option<(Mutation, Vec<Notifier>)>,
704    ) -> MetaResult<()> {
705        let progress_epoch =
706            if let Some(max_committed_epoch) = self.barrier_control.max_committed_epoch() {
707                max(max_committed_epoch, self.snapshot_epoch)
708            } else {
709                self.snapshot_epoch
710            };
711        self.upstream_lag.set(
712            barrier_info
713                .prev_epoch
714                .value()
715                .0
716                .saturating_sub(progress_epoch) as _,
717        );
718        let (mut mutation, mut notifiers) = match mutation {
719            Some((mutation, notifiers)) => (Some(mutation), notifiers),
720            None => (None, vec![]),
721        };
722        {
723            for (barrier_to_inject, mutation) in self
724                .status
725                .on_new_upstream_epoch(barrier_info, mutation.take())
726            {
727                Self::inject_barrier(
728                    self.partial_graph_id,
729                    partial_graph_manager,
730                    &mut self.barrier_control,
731                    &self.node_actors,
732                    &self.state_table_ids,
733                    false,
734                    barrier_to_inject,
735                    None,
736                    mutation,
737                    take(&mut notifiers),
738                    None,
739                )?;
740            }
741            assert!(mutation.is_none(), "must have consumed mutation");
742            assert!(notifiers.is_empty(), "must consumed notifiers");
743        }
744        Ok(())
745    }
746
747    pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
748        self.status.update_progress(
749            collected_barrier
750                .resps
751                .values()
752                .flat_map(|resp| &resp.create_mview_progress),
753        );
754        self.barrier_control.collect(collected_barrier);
755        self.should_merge_to_upstream()
756    }
757
758    pub(super) fn should_merge_to_upstream(&self) -> bool {
759        if let CreatingStreamingJobStatus::ConsumingLogStore {
760            log_store_progress_tracker,
761            barriers_to_inject,
762            ..
763        } = &self.status
764            && barriers_to_inject.is_none()
765            && log_store_progress_tracker.is_finished()
766        {
767            true
768        } else {
769            false
770        }
771    }
772}
773
774impl CreatingStreamingJobControl {
775    pub(super) fn start_completing(
776        &mut self,
777        partial_graph_manager: &mut PartialGraphManager,
778        min_upstream_inflight_epoch: Option<u64>,
779        upstream_committed_epoch: u64,
780    ) -> Option<(
781        u64,
782        Vec<BarrierCompleteResponse>,
783        PartialGraphBarrierInfo,
784        bool,
785    )> {
786        // do not commit snapshot backfill job until upstream has committed the snapshot epoch
787        if upstream_committed_epoch < self.snapshot_epoch {
788            return None;
789        }
790        let (finished_at_epoch, epoch_end_bound) = match &self.status {
791            CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
792                let epoch_end_bound = min_upstream_inflight_epoch
793                    .map(|upstream_epoch| {
794                        if upstream_epoch < *finish_at_epoch {
795                            Excluded(upstream_epoch)
796                        } else {
797                            Unbounded
798                        }
799                    })
800                    .unwrap_or(Unbounded);
801                (Some(*finish_at_epoch), epoch_end_bound)
802            }
803            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
804            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
805                None,
806                min_upstream_inflight_epoch
807                    .map(Excluded)
808                    .unwrap_or(Unbounded),
809            ),
810            CreatingStreamingJobStatus::Resetting(..) => {
811                return None;
812            }
813            CreatingStreamingJobStatus::PlaceHolder => {
814                unreachable!()
815            }
816        };
817        self.barrier_control
818            .start_completing(epoch_end_bound, |epoch| {
819                partial_graph_manager.take_collected_barrier(self.partial_graph_id, epoch)
820            })
821            .map(|(epoch, resps, info)| {
822                let is_finish_epoch = if let Some(finish_at_epoch) = finished_at_epoch {
823                    assert!(!info.post_collect_command.should_checkpoint());
824                    if epoch == finish_at_epoch {
825                        self.barrier_control.ack_completed(epoch);
826                        assert!(self.barrier_control.is_empty());
827                        true
828                    } else {
829                        false
830                    }
831                } else {
832                    false
833                };
834                (epoch, resps, info, is_finish_epoch)
835            })
836    }
837
838    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
839        self.barrier_control.ack_completed(completed_epoch);
840    }
841
842    pub fn fragment_infos_with_job_id(
843        &self,
844    ) -> impl Iterator<Item = (&InflightFragmentInfo, JobId)> + '_ {
845        self.status
846            .fragment_infos()
847            .into_iter()
848            .flat_map(|fragments| fragments.values().map(|fragment| (fragment, self.job_id)))
849    }
850
851    pub(super) fn collect_reschedule_blocked_fragment_ids(
852        &self,
853        blocked_fragment_ids: &mut HashSet<FragmentId>,
854    ) {
855        let Some(info) = self.status.creating_job_info() else {
856            return;
857        };
858
859        for (fragment_id, fragment) in &info.fragment_infos {
860            if fragment_has_online_unreschedulable_scan(fragment) {
861                blocked_fragment_ids.insert(*fragment_id);
862                collect_fragment_upstream_fragment_ids(fragment, blocked_fragment_ids);
863            }
864        }
865    }
866
867    pub(super) fn collect_no_shuffle_fragment_relations(
868        &self,
869        no_shuffle_relations: &mut Vec<(FragmentId, FragmentId)>,
870    ) {
871        let Some(info) = self.status.creating_job_info() else {
872            return;
873        };
874
875        for (upstream_fragment_id, downstreams) in &info.upstream_fragment_downstreams {
876            no_shuffle_relations.extend(
877                downstreams
878                    .iter()
879                    .filter(|downstream| downstream.dispatcher_type == DispatcherType::NoShuffle)
880                    .map(|downstream| (*upstream_fragment_id, downstream.downstream_fragment_id)),
881            );
882        }
883
884        for (fragment_id, downstreams) in &info.downstreams {
885            no_shuffle_relations.extend(
886                downstreams
887                    .iter()
888                    .filter(|downstream| downstream.dispatcher_type == DispatcherType::NoShuffle)
889                    .map(|downstream| (*fragment_id, downstream.downstream_fragment_id)),
890            );
891        }
892    }
893
894    pub fn into_tracking_job(self) -> TrackingJob {
895        assert!(self.barrier_control.is_empty());
896        match self.status {
897            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
898            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
899            | CreatingStreamingJobStatus::Resetting(..)
900            | CreatingStreamingJobStatus::PlaceHolder => {
901                unreachable!("expect finish")
902            }
903            CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
904        }
905    }
906
907    pub(super) fn on_partial_graph_reset(mut self) {
908        match &mut self.status {
909            CreatingStreamingJobStatus::Resetting(notifiers) => {
910                for notifier in notifiers.drain(..) {
911                    notifier.notify_collected();
912                }
913            }
914            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
915            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
916            | CreatingStreamingJobStatus::Finishing(_, _) => {
917                panic!(
918                    "should be resetting when receiving reset partial graph resp, but at {:?}",
919                    self.status
920                )
921            }
922            CreatingStreamingJobStatus::PlaceHolder => {
923                unreachable!()
924            }
925        }
926    }
927
928    /// Drop a creating snapshot backfill job by directly resetting the partial graph
929    /// Return `false` if the partial graph has been merged to upstream database, and `true` otherwise
930    /// to mean that the job has been dropped.
931    pub(super) fn drop(
932        &mut self,
933        notifiers: &mut Vec<Notifier>,
934        partial_graph_manager: &mut PartialGraphManager,
935    ) -> bool {
936        match &mut self.status {
937            CreatingStreamingJobStatus::Resetting(existing_notifiers) => {
938                for notifier in &mut *notifiers {
939                    notifier.notify_started();
940                }
941                existing_notifiers.append(notifiers);
942                true
943            }
944            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
945            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
946                for notifier in &mut *notifiers {
947                    notifier.notify_started();
948                }
949                partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
950                self.status = CreatingStreamingJobStatus::Resetting(take(notifiers));
951                true
952            }
953            CreatingStreamingJobStatus::Finishing(_, _) => false,
954            CreatingStreamingJobStatus::PlaceHolder => {
955                unreachable!()
956            }
957        }
958    }
959
960    pub(super) fn reset(self) -> bool {
961        match self.status {
962            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
963            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
964            | CreatingStreamingJobStatus::Finishing(_, _) => false,
965            CreatingStreamingJobStatus::Resetting(notifiers) => {
966                for notifier in notifiers {
967                    notifier.notify_collected();
968                }
969                true
970            }
971            CreatingStreamingJobStatus::PlaceHolder => {
972                unreachable!()
973            }
974        }
975    }
976}