risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::WorkerId;
29use risingwave_pb::ddl_service::PbBackfillType;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::id::{ActorId, FragmentId};
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_plan::{AddMutation, StopMutation};
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
37use status::CreatingStreamingJobStatus;
38use tracing::{debug, info};
39
40use crate::MetaResult;
41use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
42use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
43use crate::barrier::checkpoint::recovery::ResetPartialGraphCollector;
44use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
45use crate::barrier::edge_builder::FragmentEdgeBuildResult;
46use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
47use crate::barrier::notifier::Notifier;
48use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
49use crate::barrier::rpc::{
50    ControlStreamManager, build_locality_fragment_state_table_mapping, to_partial_graph_id,
51};
52use crate::barrier::{
53    BackfillOrderState, BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch,
54};
55use crate::controller::fragment::InflightFragmentInfo;
56use crate::model::{FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate};
57use crate::rpc::metrics::GLOBAL_META_METRICS;
58use crate::stream::{ExtendedFragmentBackfillOrder, build_actor_connector_splits};
59
60#[derive(Debug)]
61pub(crate) struct CreatingJobInfo {
62    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
63    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
64    pub downstreams: FragmentDownstreamRelation,
65    pub snapshot_backfill_upstream_tables: HashSet<TableId>,
66    pub stream_actors: HashMap<ActorId, StreamActor>,
67}
68
69#[derive(Debug)]
70pub(crate) struct CreatingStreamingJobControl {
71    database_id: DatabaseId,
72    pub(super) job_id: JobId,
73    pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
74    snapshot_epoch: u64,
75
76    node_actors: HashMap<WorkerId, HashSet<ActorId>>,
77    state_table_ids: HashSet<TableId>,
78
79    barrier_control: CreatingStreamingJobBarrierControl,
80    status: CreatingStreamingJobStatus,
81
82    upstream_lag: LabelGuardedIntGauge,
83}
84
85impl CreatingStreamingJobControl {
86    pub(super) fn new(
87        create_info: CreateSnapshotBackfillJobCommandInfo,
88        notifiers: Vec<Notifier>,
89        snapshot_backfill_upstream_tables: HashSet<TableId>,
90        snapshot_epoch: u64,
91        version_stat: &HummockVersionStats,
92        control_stream_manager: &mut ControlStreamManager,
93        edges: &mut FragmentEdgeBuildResult,
94    ) -> MetaResult<Self> {
95        let info = create_info.info.clone();
96        let job_id = info.stream_job_fragments.stream_job_id();
97        let database_id = info.streaming_job.database_id();
98        debug!(
99            %job_id,
100            definition = info.definition,
101            "new creating job"
102        );
103        let fragment_infos = info
104            .stream_job_fragments
105            .new_fragment_info(&info.init_split_assignment)
106            .collect();
107        let snapshot_backfill_actors =
108            InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
109        let backfill_nodes_to_pause =
110            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
111                .into_iter()
112                .collect();
113        let backfill_order_state = BackfillOrderState::new(
114            &info.fragment_backfill_ordering,
115            &info.stream_job_fragments,
116            info.locality_fragment_state_table_mapping.clone(),
117        );
118        let create_mview_tracker = CreateMviewProgressTracker::recover(
119            job_id,
120            &fragment_infos,
121            backfill_order_state,
122            version_stat,
123        );
124
125        let actors_to_create =
126            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create().map(
127                |(fragment_id, node, actors)| {
128                    (
129                        fragment_id,
130                        node,
131                        actors,
132                        [], // no subscribers for newly creating job
133                    )
134                },
135            ));
136
137        let mut barrier_control =
138            CreatingStreamingJobBarrierControl::new(job_id, snapshot_epoch, None);
139
140        let mut prev_epoch_fake_physical_time = 0;
141        let mut pending_non_checkpoint_barriers = vec![];
142
143        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
144            &mut prev_epoch_fake_physical_time,
145            &mut pending_non_checkpoint_barriers,
146            PbBarrierKind::Checkpoint,
147        );
148
149        let added_actors = info.stream_job_fragments.actor_ids().collect();
150        let actor_splits = info
151            .init_split_assignment
152            .values()
153            .flat_map(build_actor_connector_splits)
154            .collect();
155
156        assert!(
157            info.cdc_table_snapshot_splits.is_none(),
158            "should not have cdc backfill for snapshot backfill job"
159        );
160
161        let initial_mutation = Mutation::Add(AddMutation {
162            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
163            actor_dispatchers: Default::default(),
164            added_actors,
165            actor_splits,
166            // we assume that when handling snapshot backfill, the cluster must not be paused
167            pause: false,
168            subscriptions_to_add: Default::default(),
169            backfill_nodes_to_pause,
170            actor_cdc_table_snapshot_splits: None,
171            new_upstream_sinks: Default::default(),
172        });
173
174        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
175        let state_table_ids =
176            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
177
178        control_stream_manager.add_partial_graph(database_id, Some(job_id));
179        Self::inject_barrier(
180            database_id,
181            job_id,
182            control_stream_manager,
183            &mut barrier_control,
184            &node_actors,
185            Some(&state_table_ids),
186            initial_barrier_info,
187            Some(actors_to_create),
188            Some(initial_mutation),
189            notifiers,
190            Some(create_info),
191        )?;
192
193        assert!(pending_non_checkpoint_barriers.is_empty());
194
195        let job_info = CreatingJobInfo {
196            fragment_infos,
197            upstream_fragment_downstreams: info.upstream_fragment_downstreams.clone(),
198            downstreams: info.stream_job_fragments.downstreams.clone(),
199            snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
200            stream_actors: info
201                .stream_job_fragments
202                .fragments
203                .values()
204                .flat_map(|fragment| {
205                    fragment
206                        .actors
207                        .iter()
208                        .map(|actor| (actor.actor_id, actor.clone()))
209                })
210                .collect(),
211        };
212
213        Ok(Self {
214            database_id,
215            job_id,
216            snapshot_backfill_upstream_tables,
217            barrier_control,
218            snapshot_epoch,
219            status: CreatingStreamingJobStatus::ConsumingSnapshot {
220                prev_epoch_fake_physical_time,
221                pending_upstream_barriers: vec![],
222                version_stats: version_stat.clone(),
223                create_mview_tracker,
224                snapshot_backfill_actors,
225                snapshot_epoch,
226                info: job_info,
227                pending_non_checkpoint_barriers,
228            },
229            upstream_lag: GLOBAL_META_METRICS
230                .snapshot_backfill_lag
231                .with_guarded_label_values(&[&format!("{}", job_id)]),
232            node_actors,
233            state_table_ids,
234        })
235    }
236
237    pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
238        match &self.status {
239            CreatingStreamingJobStatus::ConsumingSnapshot {
240                create_mview_tracker,
241                info,
242                ..
243            } => create_mview_tracker.collect_fragment_progress(&info.fragment_infos, true),
244            CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
245                collect_done_fragments(self.job_id, &info.fragment_infos)
246            }
247            CreatingStreamingJobStatus::Finishing(_, _)
248            | CreatingStreamingJobStatus::Resetting(_, _)
249            | CreatingStreamingJobStatus::PlaceHolder => vec![],
250        }
251    }
252
253    fn resolve_upstream_log_epochs(
254        snapshot_backfill_upstream_tables: &HashSet<TableId>,
255        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
256        exclusive_start_log_epoch: u64,
257        upstream_barrier_info: &BarrierInfo,
258    ) -> MetaResult<Vec<BarrierInfo>> {
259        let table_id = snapshot_backfill_upstream_tables
260            .iter()
261            .next()
262            .expect("snapshot backfill job should have upstream");
263        let epochs_iter = if let Some(epochs) = upstream_table_log_epochs.get(table_id) {
264            let mut epochs_iter = epochs.iter();
265            loop {
266                let (_, checkpoint_epoch) =
267                    epochs_iter.next().expect("not reach committed epoch yet");
268                if *checkpoint_epoch < exclusive_start_log_epoch {
269                    continue;
270                }
271                assert_eq!(*checkpoint_epoch, exclusive_start_log_epoch);
272                break;
273            }
274            epochs_iter
275        } else {
276            // snapshot backfill job has been marked as creating, but upstream table has not committed a new epoch yet, so no table change log
277            assert_eq!(
278                upstream_barrier_info.prev_epoch(),
279                exclusive_start_log_epoch
280            );
281            static EMPTY_VEC: Vec<(Vec<u64>, u64)> = Vec::new();
282            EMPTY_VEC.iter()
283        };
284
285        let mut ret = vec![];
286        let mut prev_epoch = exclusive_start_log_epoch;
287        let mut pending_non_checkpoint_barriers = vec![];
288        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
289            for (i, epoch) in non_checkpoint_epochs
290                .iter()
291                .chain([checkpoint_epoch])
292                .enumerate()
293            {
294                assert!(*epoch > prev_epoch);
295                pending_non_checkpoint_barriers.push(prev_epoch);
296                ret.push(BarrierInfo {
297                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
298                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
299                    kind: if i == 0 {
300                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
301                    } else {
302                        BarrierKind::Barrier
303                    },
304                });
305                prev_epoch = *epoch;
306            }
307        }
308        ret.push(BarrierInfo {
309            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
310            curr_epoch: TracedEpoch::new(Epoch(upstream_barrier_info.curr_epoch())),
311            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
312        });
313        Ok(ret)
314    }
315
316    fn recover_consuming_snapshot(
317        job_id: JobId,
318        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
319        snapshot_epoch: u64,
320        committed_epoch: u64,
321        upstream_barrier_info: &BarrierInfo,
322        info: CreatingJobInfo,
323        backfill_order_state: BackfillOrderState,
324        version_stat: &HummockVersionStats,
325    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
326        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
327        let mut pending_non_checkpoint_barriers = vec![];
328        let create_mview_tracker = CreateMviewProgressTracker::recover(
329            job_id,
330            &info.fragment_infos,
331            backfill_order_state,
332            version_stat,
333        );
334        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
335            &mut prev_epoch_fake_physical_time,
336            &mut pending_non_checkpoint_barriers,
337            PbBarrierKind::Initial,
338        );
339
340        Ok((
341            CreatingStreamingJobStatus::ConsumingSnapshot {
342                prev_epoch_fake_physical_time,
343                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
344                    &info.snapshot_backfill_upstream_tables,
345                    upstream_table_log_epochs,
346                    snapshot_epoch,
347                    upstream_barrier_info,
348                )?,
349                version_stats: version_stat.clone(),
350                create_mview_tracker,
351                snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
352                    &info.fragment_infos,
353                )
354                .collect(),
355                info,
356                snapshot_epoch,
357                pending_non_checkpoint_barriers,
358            },
359            barrier_info,
360        ))
361    }
362
363    fn recover_consuming_log_store(
364        job_id: JobId,
365        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
366        committed_epoch: u64,
367        upstream_barrier_info: &BarrierInfo,
368        info: CreatingJobInfo,
369    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
370        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
371            &info.snapshot_backfill_upstream_tables,
372            upstream_table_log_epochs,
373            committed_epoch,
374            upstream_barrier_info,
375        )?;
376        let mut first_barrier = barriers_to_inject.remove(0);
377        assert!(first_barrier.kind.is_checkpoint());
378        first_barrier.kind = BarrierKind::Initial;
379
380        Ok((
381            CreatingStreamingJobStatus::ConsumingLogStore {
382                tracking_job: TrackingJob::recovered(job_id, &info.fragment_infos),
383                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
384                    InflightStreamingJobInfo::snapshot_backfill_actor_ids(&info.fragment_infos),
385                    barriers_to_inject
386                        .last()
387                        .map(|info| info.prev_epoch() - committed_epoch)
388                        .unwrap_or(0),
389                ),
390                barriers_to_inject: Some(barriers_to_inject),
391                info,
392            },
393            first_barrier,
394        ))
395    }
396
397    #[expect(clippy::too_many_arguments)]
398    pub(crate) fn recover(
399        database_id: DatabaseId,
400        job_id: JobId,
401        snapshot_backfill_upstream_tables: HashSet<TableId>,
402        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
403        snapshot_epoch: u64,
404        committed_epoch: u64,
405        upstream_barrier_info: &BarrierInfo,
406        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
407        backfill_order: ExtendedFragmentBackfillOrder,
408        fragment_relations: &FragmentDownstreamRelation,
409        version_stat: &HummockVersionStats,
410        new_actors: StreamJobActorsToCreate,
411        initial_mutation: Mutation,
412        control_stream_manager: &mut ControlStreamManager,
413    ) -> MetaResult<Self> {
414        info!(
415            %job_id,
416            "recovered creating snapshot backfill job"
417        );
418        let mut barrier_control =
419            CreatingStreamingJobBarrierControl::new(job_id, snapshot_epoch, Some(committed_epoch));
420
421        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
422        let state_table_ids =
423            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
424
425        let mut upstream_fragment_downstreams: FragmentDownstreamRelation = Default::default();
426        for (upstream_fragment_id, downstreams) in fragment_relations {
427            if fragment_infos.contains_key(upstream_fragment_id) {
428                continue;
429            }
430            for downstream in downstreams {
431                if fragment_infos.contains_key(&downstream.downstream_fragment_id) {
432                    upstream_fragment_downstreams
433                        .entry(*upstream_fragment_id)
434                        .or_default()
435                        .push(downstream.clone());
436                }
437            }
438        }
439        let downstreams = fragment_infos
440            .keys()
441            .filter_map(|fragment_id| {
442                fragment_relations
443                    .get(fragment_id)
444                    .map(|relation| (*fragment_id, relation.clone()))
445            })
446            .collect();
447
448        let info = CreatingJobInfo {
449            fragment_infos,
450            upstream_fragment_downstreams,
451            downstreams,
452            snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
453            stream_actors: new_actors
454                .values()
455                .flat_map(|fragments| {
456                    fragments.values().flat_map(|(_, actors, _)| {
457                        actors
458                            .iter()
459                            .map(|(actor, _, _)| (actor.actor_id, actor.clone()))
460                    })
461                })
462                .collect(),
463        };
464
465        let (status, first_barrier_info) = if committed_epoch < snapshot_epoch {
466            let locality_fragment_state_table_mapping =
467                build_locality_fragment_state_table_mapping(&info.fragment_infos);
468            let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
469                &backfill_order,
470                &info.fragment_infos,
471                locality_fragment_state_table_mapping,
472            );
473            Self::recover_consuming_snapshot(
474                job_id,
475                upstream_table_log_epochs,
476                snapshot_epoch,
477                committed_epoch,
478                upstream_barrier_info,
479                info,
480                backfill_order_state,
481                version_stat,
482            )?
483        } else {
484            Self::recover_consuming_log_store(
485                job_id,
486                upstream_table_log_epochs,
487                committed_epoch,
488                upstream_barrier_info,
489                info,
490            )?
491        };
492        control_stream_manager.add_partial_graph(database_id, Some(job_id));
493
494        Self::inject_barrier(
495            database_id,
496            job_id,
497            control_stream_manager,
498            &mut barrier_control,
499            &node_actors,
500            Some(&state_table_ids),
501            first_barrier_info,
502            Some(new_actors),
503            Some(initial_mutation),
504            vec![], // no notifiers in recovery
505            None,
506        )?;
507        Ok(Self {
508            database_id,
509            job_id,
510            snapshot_backfill_upstream_tables,
511            snapshot_epoch,
512            node_actors,
513            state_table_ids,
514            barrier_control,
515            status,
516            upstream_lag: GLOBAL_META_METRICS
517                .snapshot_backfill_lag
518                .with_guarded_label_values(&[&format!("{}", job_id)]),
519        })
520    }
521
522    pub(crate) fn is_empty(&self) -> bool {
523        self.barrier_control.is_empty()
524    }
525
526    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
527        self.barrier_control.is_valid_after_worker_err(worker_id)
528            && self
529                .status
530                .fragment_infos()
531                .map(|fragment_infos| {
532                    !InflightFragmentInfo::contains_worker(fragment_infos.values(), worker_id)
533                })
534                .unwrap_or(true)
535    }
536
537    pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
538        let progress = match &self.status {
539            CreatingStreamingJobStatus::ConsumingSnapshot {
540                create_mview_tracker,
541                ..
542            } => {
543                if create_mview_tracker.is_finished() {
544                    "Snapshot finished".to_owned()
545                } else {
546                    let progress = create_mview_tracker.gen_backfill_progress();
547                    format!("Snapshot [{}]", progress)
548                }
549            }
550            CreatingStreamingJobStatus::ConsumingLogStore {
551                log_store_progress_tracker,
552                ..
553            } => {
554                format!(
555                    "LogStore [{}]",
556                    log_store_progress_tracker.gen_backfill_progress()
557                )
558            }
559            CreatingStreamingJobStatus::Finishing(..) => {
560                format!(
561                    "Finishing [epoch count: {}]",
562                    self.barrier_control.inflight_barrier_count()
563                )
564            }
565            CreatingStreamingJobStatus::Resetting(_, _) => "Resetting".to_owned(),
566            CreatingStreamingJobStatus::PlaceHolder => {
567                unreachable!()
568            }
569        };
570        BackfillProgress {
571            progress,
572            backfill_type: PbBackfillType::SnapshotBackfill,
573        }
574    }
575
576    pub(super) fn pinned_upstream_log_epoch(&self) -> u64 {
577        max(
578            self.barrier_control.max_committed_epoch().unwrap_or(0),
579            self.snapshot_epoch,
580        )
581    }
582
583    #[expect(clippy::too_many_arguments)]
584    fn inject_barrier(
585        database_id: DatabaseId,
586        job_id: JobId,
587        control_stream_manager: &mut ControlStreamManager,
588        barrier_control: &mut CreatingStreamingJobBarrierControl,
589        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
590        state_table_ids: Option<&HashSet<TableId>>,
591        barrier_info: BarrierInfo,
592        new_actors: Option<StreamJobActorsToCreate>,
593        mutation: Option<Mutation>,
594        mut notifiers: Vec<Notifier>,
595        first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
596    ) -> MetaResult<()> {
597        let (state_table_ids, nodes_to_sync_table) = if let Some(state_table_ids) = state_table_ids
598        {
599            (Some(state_table_ids), Some(node_actors.keys().copied()))
600        } else {
601            (None, None)
602        };
603        let node_to_collect = control_stream_manager.inject_barrier(
604            database_id,
605            Some(job_id),
606            mutation,
607            &barrier_info,
608            node_actors,
609            state_table_ids.into_iter().flatten().copied(),
610            nodes_to_sync_table.into_iter().flatten(),
611            new_actors,
612        )?;
613        notifiers.iter_mut().for_each(|n| n.notify_started());
614        barrier_control.enqueue_epoch(
615            barrier_info.prev_epoch(),
616            node_to_collect,
617            barrier_info.kind.clone(),
618            notifiers,
619            first_create_info,
620        );
621        Ok(())
622    }
623
624    pub(super) fn start_consume_upstream(
625        &mut self,
626        control_stream_manager: &mut ControlStreamManager,
627        barrier_info: &BarrierInfo,
628    ) -> MetaResult<CreatingJobInfo> {
629        info!(
630            job_id = %self.job_id,
631            prev_epoch = barrier_info.prev_epoch(),
632            "start consuming upstream"
633        );
634        let info = self.status.start_consume_upstream(barrier_info);
635        Self::inject_barrier(
636            self.database_id,
637            self.job_id,
638            control_stream_manager,
639            &mut self.barrier_control,
640            &self.node_actors,
641            None,
642            barrier_info.clone(),
643            None,
644            Some(Mutation::Stop(StopMutation {
645                // stop all actors
646                actors: info
647                    .fragment_infos
648                    .values()
649                    .flat_map(|info| info.actors.keys().copied())
650                    .collect(),
651                dropped_sink_fragments: vec![], // not related to sink-into-table
652            })),
653            vec![], // no notifiers when start consuming upstream
654            None,
655        )?;
656        Ok(info)
657    }
658
659    pub(super) fn on_new_upstream_barrier(
660        &mut self,
661        control_stream_manager: &mut ControlStreamManager,
662        barrier_info: &BarrierInfo,
663        mutation: Option<(Mutation, Vec<Notifier>)>,
664    ) -> MetaResult<()> {
665        let progress_epoch =
666            if let Some(max_committed_epoch) = self.barrier_control.max_committed_epoch() {
667                max(max_committed_epoch, self.snapshot_epoch)
668            } else {
669                self.snapshot_epoch
670            };
671        self.upstream_lag.set(
672            barrier_info
673                .prev_epoch
674                .value()
675                .0
676                .saturating_sub(progress_epoch) as _,
677        );
678        let (mut mutation, mut notifiers) = match mutation {
679            Some((mutation, notifiers)) => (Some(mutation), notifiers),
680            None => (None, vec![]),
681        };
682        {
683            for (barrier_to_inject, mutation) in self
684                .status
685                .on_new_upstream_epoch(barrier_info, mutation.take())
686            {
687                Self::inject_barrier(
688                    self.database_id,
689                    self.job_id,
690                    control_stream_manager,
691                    &mut self.barrier_control,
692                    &self.node_actors,
693                    Some(&self.state_table_ids),
694                    barrier_to_inject,
695                    None,
696                    mutation,
697                    take(&mut notifiers),
698                    None,
699                )?;
700            }
701            assert!(mutation.is_none(), "must have consumed mutation");
702            assert!(notifiers.is_empty(), "must consumed notifiers");
703        }
704        Ok(())
705    }
706
707    pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
708        self.status.update_progress(&resp.create_mview_progress);
709        self.barrier_control.collect(resp);
710        self.should_merge_to_upstream()
711    }
712
713    pub(super) fn should_merge_to_upstream(&self) -> bool {
714        if let CreatingStreamingJobStatus::ConsumingLogStore {
715            log_store_progress_tracker,
716            barriers_to_inject,
717            ..
718        } = &self.status
719            && barriers_to_inject.is_none()
720            && log_store_progress_tracker.is_finished()
721        {
722            true
723        } else {
724            false
725        }
726    }
727}
728
729pub(super) enum CompleteJobType {
730    /// The first barrier
731    First(CreateSnapshotBackfillJobCommandInfo),
732    Normal,
733    /// The last barrier to complete
734    Finished,
735}
736
737impl CreatingStreamingJobControl {
738    pub(super) fn start_completing(
739        &mut self,
740        min_upstream_inflight_epoch: Option<u64>,
741        upstream_committed_epoch: u64,
742    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
743        // do not commit snapshot backfill job until upstream has committed the snapshot epoch
744        if upstream_committed_epoch < self.snapshot_epoch {
745            return None;
746        }
747        let (finished_at_epoch, epoch_end_bound) = match &self.status {
748            CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
749                let epoch_end_bound = min_upstream_inflight_epoch
750                    .map(|upstream_epoch| {
751                        if upstream_epoch < *finish_at_epoch {
752                            Excluded(upstream_epoch)
753                        } else {
754                            Unbounded
755                        }
756                    })
757                    .unwrap_or(Unbounded);
758                (Some(*finish_at_epoch), epoch_end_bound)
759            }
760            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
761            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
762                None,
763                min_upstream_inflight_epoch
764                    .map(Excluded)
765                    .unwrap_or(Unbounded),
766            ),
767            CreatingStreamingJobStatus::Resetting(_, _) => {
768                return None;
769            }
770            CreatingStreamingJobStatus::PlaceHolder => {
771                unreachable!()
772            }
773        };
774        self.barrier_control.start_completing(epoch_end_bound).map(
775            |(epoch, resps, create_job_info)| {
776                let status = if let Some(finish_at_epoch) = finished_at_epoch {
777                    assert!(create_job_info.is_none());
778                    if epoch == finish_at_epoch {
779                        self.barrier_control.ack_completed(epoch);
780                        assert!(self.barrier_control.is_empty());
781                        CompleteJobType::Finished
782                    } else {
783                        CompleteJobType::Normal
784                    }
785                } else if let Some(info) = create_job_info {
786                    CompleteJobType::First(info)
787                } else {
788                    CompleteJobType::Normal
789                };
790                (epoch, resps, status)
791            },
792        )
793    }
794
795    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
796        self.barrier_control.ack_completed(completed_epoch);
797    }
798
799    pub fn is_consuming(&self) -> bool {
800        match &self.status {
801            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
802            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
803            CreatingStreamingJobStatus::Finishing(..)
804            | CreatingStreamingJobStatus::Resetting(_, _) => false,
805            CreatingStreamingJobStatus::PlaceHolder => {
806                unreachable!()
807            }
808        }
809    }
810
811    pub fn state_table_ids(&self) -> &HashSet<TableId> {
812        &self.state_table_ids
813    }
814
815    pub fn fragment_infos_with_job_id(
816        &self,
817    ) -> impl Iterator<Item = (&InflightFragmentInfo, JobId)> + '_ {
818        self.status
819            .fragment_infos()
820            .into_iter()
821            .flat_map(|fragments| fragments.values().map(|fragment| (fragment, self.job_id)))
822    }
823
824    pub fn into_tracking_job(self) -> TrackingJob {
825        assert!(self.barrier_control.is_empty());
826        match self.status {
827            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
828            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
829            | CreatingStreamingJobStatus::Resetting(_, _)
830            | CreatingStreamingJobStatus::PlaceHolder => {
831                unreachable!("expect finish")
832            }
833            CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
834        }
835    }
836
837    pub(super) fn on_reset_partial_graph_resp(
838        &mut self,
839        worker_id: WorkerId,
840        resp: ResetPartialGraphResponse,
841    ) -> bool {
842        match &mut self.status {
843            CreatingStreamingJobStatus::Resetting(collector, notifiers) => {
844                collector.collect(worker_id, resp);
845                if collector.remaining_workers.is_empty() {
846                    for notifier in notifiers.drain(..) {
847                        notifier.notify_collected();
848                    }
849                    true
850                } else {
851                    false
852                }
853            }
854            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
855            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
856            | CreatingStreamingJobStatus::Finishing(_, _) => {
857                panic!(
858                    "should be resetting when receiving reset partial graph resp, but at {:?}",
859                    self.status
860                )
861            }
862            CreatingStreamingJobStatus::PlaceHolder => {
863                unreachable!()
864            }
865        }
866    }
867
868    /// Drop a creating snapshot backfill job by directly resetting the partial graph
869    /// Return `false` if the partial graph has been merged to upstream database, and `true` otherwise
870    /// to mean that the job has been dropped.
871    pub(super) fn drop(
872        &mut self,
873        notifiers: &mut Vec<Notifier>,
874        control_stream_manager: &mut ControlStreamManager,
875    ) -> bool {
876        match &mut self.status {
877            CreatingStreamingJobStatus::Resetting(_, existing_notifiers) => {
878                for notifier in &mut *notifiers {
879                    notifier.notify_started();
880                }
881                existing_notifiers.append(notifiers);
882                true
883            }
884            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
885            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
886                for notifier in &mut *notifiers {
887                    notifier.notify_started();
888                }
889                let remaining_workers =
890                    control_stream_manager.reset_partial_graphs(vec![to_partial_graph_id(
891                        self.database_id,
892                        Some(self.job_id),
893                    )]);
894                let collector = ResetPartialGraphCollector {
895                    remaining_workers,
896                    reset_resps: Default::default(),
897                };
898                self.status = CreatingStreamingJobStatus::Resetting(collector, take(notifiers));
899                true
900            }
901            CreatingStreamingJobStatus::Finishing(_, _) => false,
902            CreatingStreamingJobStatus::PlaceHolder => {
903                unreachable!()
904            }
905        }
906    }
907
908    pub(super) fn reset(self) -> Option<ResetPartialGraphCollector> {
909        match self.status {
910            CreatingStreamingJobStatus::Resetting(collector, notifiers) => {
911                for notifier in notifiers {
912                    notifier.notify_collected();
913                }
914                Some(collector)
915            }
916            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
917            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
918            | CreatingStreamingJobStatus::Finishing(_, _) => None,
919            CreatingStreamingJobStatus::PlaceHolder => {
920                unreachable!()
921            }
922        }
923    }
924}