risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::{CreateType, WorkerId};
29use risingwave_pb::ddl_service::DdlProgress;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::id::{ActorId, FragmentId};
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_plan::{AddMutation, StopMutation};
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use status::CreatingStreamingJobStatus;
37use tracing::{debug, info};
38
39use crate::MetaResult;
40use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
41use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
42use crate::barrier::edge_builder::FragmentEdgeBuildResult;
43use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
44use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
45use crate::barrier::rpc::ControlStreamManager;
46use crate::barrier::{BackfillOrderState, BarrierKind, CreateStreamingJobCommandInfo, TracedEpoch};
47use crate::controller::fragment::InflightFragmentInfo;
48use crate::model::{FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate};
49use crate::rpc::metrics::GLOBAL_META_METRICS;
50use crate::stream::build_actor_connector_splits;
51
52#[derive(Debug)]
53pub(crate) struct CreatingJobInfo {
54    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
55    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
56    pub downstreams: FragmentDownstreamRelation,
57    pub snapshot_backfill_upstream_tables: HashSet<TableId>,
58    pub stream_actors: HashMap<ActorId, StreamActor>,
59}
60
61#[derive(Debug)]
62pub(crate) struct CreatingStreamingJobControl {
63    database_id: DatabaseId,
64    pub(super) job_id: JobId,
65    definition: String,
66    create_type: CreateType,
67    pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
68    snapshot_epoch: u64,
69
70    node_actors: HashMap<WorkerId, HashSet<ActorId>>,
71    state_table_ids: HashSet<TableId>,
72
73    barrier_control: CreatingStreamingJobBarrierControl,
74    status: CreatingStreamingJobStatus,
75
76    upstream_lag: LabelGuardedIntGauge,
77}
78
79impl CreatingStreamingJobControl {
80    pub(super) fn new(
81        info: &CreateStreamingJobCommandInfo,
82        snapshot_backfill_upstream_tables: HashSet<TableId>,
83        snapshot_epoch: u64,
84        version_stat: &HummockVersionStats,
85        control_stream_manager: &mut ControlStreamManager,
86        edges: &mut FragmentEdgeBuildResult,
87    ) -> MetaResult<Self> {
88        let job_id = info.stream_job_fragments.stream_job_id();
89        let database_id = info.streaming_job.database_id();
90        debug!(
91            %job_id,
92            definition = info.definition,
93            "new creating job"
94        );
95        let fragment_infos = info
96            .stream_job_fragments
97            .new_fragment_info(&info.init_split_assignment)
98            .collect();
99        let snapshot_backfill_actors =
100            InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
101        let backfill_nodes_to_pause =
102            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
103                .into_iter()
104                .collect();
105        let backfill_order_state = BackfillOrderState::new(
106            &info.fragment_backfill_ordering,
107            &info.stream_job_fragments,
108            info.locality_fragment_state_table_mapping.clone(),
109        );
110        let create_mview_tracker = CreateMviewProgressTracker::recover(
111            job_id,
112            info.definition.clone(),
113            &fragment_infos,
114            backfill_order_state,
115            version_stat,
116        );
117
118        let actors_to_create =
119            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create().map(
120                |(fragment_id, node, actors)| {
121                    (
122                        fragment_id,
123                        node,
124                        actors,
125                        [], // no subscribers for newly creating job
126                    )
127                },
128            ));
129
130        let mut barrier_control =
131            CreatingStreamingJobBarrierControl::new(job_id, snapshot_epoch, false, None);
132
133        let mut prev_epoch_fake_physical_time = 0;
134        let mut pending_non_checkpoint_barriers = vec![];
135
136        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
137            &mut prev_epoch_fake_physical_time,
138            &mut pending_non_checkpoint_barriers,
139            PbBarrierKind::Checkpoint,
140        );
141
142        let added_actors = info.stream_job_fragments.actor_ids().collect();
143        let actor_splits = info
144            .init_split_assignment
145            .values()
146            .flat_map(build_actor_connector_splits)
147            .collect();
148
149        assert!(
150            info.cdc_table_snapshot_splits.is_none(),
151            "should not have cdc backfill for snapshot backfill job"
152        );
153
154        let initial_mutation = Mutation::Add(AddMutation {
155            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
156            actor_dispatchers: Default::default(),
157            added_actors,
158            actor_splits,
159            // we assume that when handling snapshot backfill, the cluster must not be paused
160            pause: false,
161            subscriptions_to_add: Default::default(),
162            backfill_nodes_to_pause,
163            actor_cdc_table_snapshot_splits: None,
164            new_upstream_sinks: Default::default(),
165        });
166
167        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
168        let state_table_ids =
169            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
170
171        control_stream_manager.add_partial_graph(database_id, Some(job_id));
172        Self::inject_barrier(
173            database_id,
174            job_id,
175            control_stream_manager,
176            &mut barrier_control,
177            &node_actors,
178            Some(&state_table_ids),
179            initial_barrier_info,
180            Some(actors_to_create),
181            Some(initial_mutation),
182        )?;
183
184        assert!(pending_non_checkpoint_barriers.is_empty());
185
186        let job_info = CreatingJobInfo {
187            fragment_infos,
188            upstream_fragment_downstreams: info.upstream_fragment_downstreams.clone(),
189            downstreams: info.stream_job_fragments.downstreams.clone(),
190            snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
191            stream_actors: info
192                .stream_job_fragments
193                .fragments
194                .values()
195                .flat_map(|fragment| {
196                    fragment
197                        .actors
198                        .iter()
199                        .map(|actor| (actor.actor_id, actor.clone()))
200                })
201                .collect(),
202        };
203
204        Ok(Self {
205            database_id,
206            definition: info.definition.clone(),
207            create_type: info.create_type.into(),
208            job_id,
209            snapshot_backfill_upstream_tables,
210            barrier_control,
211            snapshot_epoch,
212            status: CreatingStreamingJobStatus::ConsumingSnapshot {
213                prev_epoch_fake_physical_time,
214                pending_upstream_barriers: vec![],
215                version_stats: version_stat.clone(),
216                create_mview_tracker,
217                snapshot_backfill_actors,
218                snapshot_epoch,
219                info: job_info,
220                pending_non_checkpoint_barriers,
221            },
222            upstream_lag: GLOBAL_META_METRICS
223                .snapshot_backfill_lag
224                .with_guarded_label_values(&[&format!("{}", job_id)]),
225            node_actors,
226            state_table_ids,
227        })
228    }
229
230    fn resolve_upstream_log_epochs(
231        snapshot_backfill_upstream_tables: &HashSet<TableId>,
232        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
233        exclusive_start_log_epoch: u64,
234        upstream_barrier_info: &BarrierInfo,
235    ) -> MetaResult<Vec<BarrierInfo>> {
236        let table_id = snapshot_backfill_upstream_tables
237            .iter()
238            .next()
239            .expect("snapshot backfill job should have upstream");
240        let epochs_iter = if let Some(epochs) = upstream_table_log_epochs.get(table_id) {
241            let mut epochs_iter = epochs.iter();
242            loop {
243                let (_, checkpoint_epoch) =
244                    epochs_iter.next().expect("not reach committed epoch yet");
245                if *checkpoint_epoch < exclusive_start_log_epoch {
246                    continue;
247                }
248                assert_eq!(*checkpoint_epoch, exclusive_start_log_epoch);
249                break;
250            }
251            epochs_iter
252        } else {
253            // snapshot backfill job has been marked as creating, but upstream table has not committed a new epoch yet, so no table change log
254            assert_eq!(
255                upstream_barrier_info.prev_epoch(),
256                exclusive_start_log_epoch
257            );
258            static EMPTY_VEC: Vec<(Vec<u64>, u64)> = Vec::new();
259            EMPTY_VEC.iter()
260        };
261
262        let mut ret = vec![];
263        let mut prev_epoch = exclusive_start_log_epoch;
264        let mut pending_non_checkpoint_barriers = vec![];
265        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
266            for (i, epoch) in non_checkpoint_epochs
267                .iter()
268                .chain([checkpoint_epoch])
269                .enumerate()
270            {
271                assert!(*epoch > prev_epoch);
272                pending_non_checkpoint_barriers.push(prev_epoch);
273                ret.push(BarrierInfo {
274                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
275                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
276                    kind: if i == 0 {
277                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
278                    } else {
279                        BarrierKind::Barrier
280                    },
281                });
282                prev_epoch = *epoch;
283            }
284        }
285        ret.push(BarrierInfo {
286            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
287            curr_epoch: TracedEpoch::new(Epoch(upstream_barrier_info.curr_epoch())),
288            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
289        });
290        Ok(ret)
291    }
292
293    fn recover_consuming_snapshot(
294        job_id: JobId,
295        definition: &String,
296        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
297        snapshot_epoch: u64,
298        committed_epoch: u64,
299        upstream_barrier_info: &BarrierInfo,
300        info: CreatingJobInfo,
301        version_stat: &HummockVersionStats,
302    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
303        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
304        let mut pending_non_checkpoint_barriers = vec![];
305        let create_mview_tracker = CreateMviewProgressTracker::recover(
306            job_id,
307            definition.clone(),
308            &info.fragment_infos,
309            Default::default(),
310            version_stat,
311        );
312        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
313            &mut prev_epoch_fake_physical_time,
314            &mut pending_non_checkpoint_barriers,
315            PbBarrierKind::Initial,
316        );
317
318        Ok((
319            CreatingStreamingJobStatus::ConsumingSnapshot {
320                prev_epoch_fake_physical_time,
321                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
322                    &info.snapshot_backfill_upstream_tables,
323                    upstream_table_log_epochs,
324                    snapshot_epoch,
325                    upstream_barrier_info,
326                )?,
327                version_stats: version_stat.clone(),
328                create_mview_tracker,
329                snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
330                    &info.fragment_infos,
331                )
332                .collect(),
333                info,
334                snapshot_epoch,
335                pending_non_checkpoint_barriers,
336            },
337            barrier_info,
338        ))
339    }
340
341    fn recover_consuming_log_store(
342        job_id: JobId,
343        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
344        committed_epoch: u64,
345        upstream_barrier_info: &BarrierInfo,
346        info: CreatingJobInfo,
347    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
348        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
349            &info.snapshot_backfill_upstream_tables,
350            upstream_table_log_epochs,
351            committed_epoch,
352            upstream_barrier_info,
353        )?;
354        let mut first_barrier = barriers_to_inject.remove(0);
355        assert!(first_barrier.kind.is_checkpoint());
356        first_barrier.kind = BarrierKind::Initial;
357
358        Ok((
359            CreatingStreamingJobStatus::ConsumingLogStore {
360                tracking_job: TrackingJob::recovered(job_id, &info.fragment_infos),
361                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
362                    InflightStreamingJobInfo::snapshot_backfill_actor_ids(&info.fragment_infos),
363                    barriers_to_inject
364                        .last()
365                        .map(|info| info.prev_epoch() - committed_epoch)
366                        .unwrap_or(0),
367                ),
368                barriers_to_inject: Some(barriers_to_inject),
369                info,
370            },
371            first_barrier,
372        ))
373    }
374
375    #[expect(clippy::too_many_arguments)]
376    pub(crate) fn recover(
377        database_id: DatabaseId,
378        job_id: JobId,
379        definition: String,
380        snapshot_backfill_upstream_tables: HashSet<TableId>,
381        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
382        snapshot_epoch: u64,
383        committed_epoch: u64,
384        upstream_barrier_info: &BarrierInfo,
385        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
386        fragment_relations: &FragmentDownstreamRelation,
387        version_stat: &HummockVersionStats,
388        new_actors: StreamJobActorsToCreate,
389        initial_mutation: Mutation,
390        control_stream_manager: &mut ControlStreamManager,
391    ) -> MetaResult<Self> {
392        debug!(
393            %job_id,
394            definition,
395            "recovered creating job"
396        );
397        let mut barrier_control = CreatingStreamingJobBarrierControl::new(
398            job_id,
399            snapshot_epoch,
400            true,
401            Some(committed_epoch),
402        );
403
404        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
405        let state_table_ids =
406            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
407
408        let mut upstream_fragment_downstreams: FragmentDownstreamRelation = Default::default();
409        for (upstream_fragment_id, downstreams) in fragment_relations {
410            if fragment_infos.contains_key(upstream_fragment_id) {
411                continue;
412            }
413            for downstream in downstreams {
414                if fragment_infos.contains_key(&downstream.downstream_fragment_id) {
415                    upstream_fragment_downstreams
416                        .entry(*upstream_fragment_id)
417                        .or_default()
418                        .push(downstream.clone());
419                }
420            }
421        }
422        let downstreams = fragment_infos
423            .keys()
424            .filter_map(|fragment_id| {
425                fragment_relations
426                    .get(fragment_id)
427                    .map(|relation| (*fragment_id, relation.clone()))
428            })
429            .collect();
430
431        let info = CreatingJobInfo {
432            fragment_infos,
433            upstream_fragment_downstreams,
434            downstreams,
435            snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
436            stream_actors: new_actors
437                .values()
438                .flat_map(|fragments| {
439                    fragments.values().flat_map(|(_, actors, _)| {
440                        actors
441                            .iter()
442                            .map(|(actor, _, _)| (actor.actor_id, actor.clone()))
443                    })
444                })
445                .collect(),
446        };
447
448        let (status, first_barrier_info) = if committed_epoch < snapshot_epoch {
449            Self::recover_consuming_snapshot(
450                job_id,
451                &definition,
452                upstream_table_log_epochs,
453                snapshot_epoch,
454                committed_epoch,
455                upstream_barrier_info,
456                info,
457                version_stat,
458            )?
459        } else {
460            Self::recover_consuming_log_store(
461                job_id,
462                upstream_table_log_epochs,
463                committed_epoch,
464                upstream_barrier_info,
465                info,
466            )?
467        };
468        control_stream_manager.add_partial_graph(database_id, Some(job_id));
469
470        Self::inject_barrier(
471            database_id,
472            job_id,
473            control_stream_manager,
474            &mut barrier_control,
475            &node_actors,
476            Some(&state_table_ids),
477            first_barrier_info,
478            Some(new_actors),
479            Some(initial_mutation),
480        )?;
481        Ok(Self {
482            database_id,
483            job_id,
484            definition,
485            create_type: CreateType::Background,
486            snapshot_backfill_upstream_tables,
487            snapshot_epoch,
488            node_actors,
489            state_table_ids,
490            barrier_control,
491            status,
492            upstream_lag: GLOBAL_META_METRICS
493                .snapshot_backfill_lag
494                .with_guarded_label_values(&[&format!("{}", job_id)]),
495        })
496    }
497
498    pub(crate) fn is_empty(&self) -> bool {
499        self.barrier_control.is_empty()
500    }
501
502    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
503        self.barrier_control.is_valid_after_worker_err(worker_id)
504            && self
505                .status
506                .fragment_infos()
507                .map(|fragment_infos| {
508                    InflightFragmentInfo::contains_worker(fragment_infos.values(), worker_id)
509                })
510                .unwrap_or(true)
511    }
512
513    pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
514        let progress = match &self.status {
515            CreatingStreamingJobStatus::ConsumingSnapshot {
516                create_mview_tracker,
517                ..
518            } => {
519                if create_mview_tracker.is_finished() {
520                    "Snapshot finished".to_owned()
521                } else {
522                    let progress = create_mview_tracker.gen_ddl_progress();
523                    format!("Snapshot [{}]", progress.progress)
524                }
525            }
526            CreatingStreamingJobStatus::ConsumingLogStore {
527                log_store_progress_tracker,
528                ..
529            } => {
530                format!(
531                    "LogStore [{}]",
532                    log_store_progress_tracker.gen_ddl_progress()
533                )
534            }
535            CreatingStreamingJobStatus::Finishing(..) => {
536                format!(
537                    "Finishing [epoch count: {}]",
538                    self.barrier_control.inflight_barrier_count()
539                )
540            }
541            CreatingStreamingJobStatus::PlaceHolder => {
542                unreachable!()
543            }
544        };
545        DdlProgress {
546            id: self.job_id.as_raw_id() as u64,
547            statement: self.definition.clone(),
548            create_type: self.create_type.as_str().to_owned(),
549            progress,
550        }
551    }
552
553    pub(super) fn pinned_upstream_log_epoch(&self) -> u64 {
554        max(
555            self.barrier_control.max_committed_epoch().unwrap_or(0),
556            self.snapshot_epoch,
557        )
558    }
559
560    fn inject_barrier(
561        database_id: DatabaseId,
562        job_id: JobId,
563        control_stream_manager: &mut ControlStreamManager,
564        barrier_control: &mut CreatingStreamingJobBarrierControl,
565        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
566        state_table_ids: Option<&HashSet<TableId>>,
567        barrier_info: BarrierInfo,
568        new_actors: Option<StreamJobActorsToCreate>,
569        mutation: Option<Mutation>,
570    ) -> MetaResult<()> {
571        let node_to_collect = control_stream_manager.inject_barrier(
572            database_id,
573            Some(job_id),
574            mutation,
575            &barrier_info,
576            node_actors,
577            state_table_ids.into_iter().flatten().copied(),
578            new_actors,
579        )?;
580        barrier_control.enqueue_epoch(
581            barrier_info.prev_epoch(),
582            node_to_collect,
583            barrier_info.kind.clone(),
584        );
585        Ok(())
586    }
587
588    pub(super) fn start_consume_upstream(
589        &mut self,
590        control_stream_manager: &mut ControlStreamManager,
591        barrier_info: &BarrierInfo,
592    ) -> MetaResult<CreatingJobInfo> {
593        info!(
594            job_id = %self.job_id,
595            prev_epoch = barrier_info.prev_epoch(),
596            "start consuming upstream"
597        );
598        let info = self.status.start_consume_upstream(barrier_info);
599        Self::inject_barrier(
600            self.database_id,
601            self.job_id,
602            control_stream_manager,
603            &mut self.barrier_control,
604            &self.node_actors,
605            None,
606            barrier_info.clone(),
607            None,
608            Some(Mutation::Stop(StopMutation {
609                // stop all actors
610                actors: info
611                    .fragment_infos
612                    .values()
613                    .flat_map(|info| info.actors.keys().copied())
614                    .collect(),
615                dropped_sink_fragments: vec![], // not related to sink-into-table
616            })),
617        )?;
618        Ok(info)
619    }
620
621    pub(super) fn on_new_upstream_barrier(
622        &mut self,
623        control_stream_manager: &mut ControlStreamManager,
624        barrier_info: &BarrierInfo,
625    ) -> MetaResult<()> {
626        let progress_epoch =
627            if let Some(max_committed_epoch) = self.barrier_control.max_committed_epoch() {
628                max(max_committed_epoch, self.snapshot_epoch)
629            } else {
630                self.snapshot_epoch
631            };
632        self.upstream_lag.set(
633            barrier_info
634                .prev_epoch
635                .value()
636                .0
637                .saturating_sub(progress_epoch) as _,
638        );
639        {
640            for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
641                Self::inject_barrier(
642                    self.database_id,
643                    self.job_id,
644                    control_stream_manager,
645                    &mut self.barrier_control,
646                    &self.node_actors,
647                    Some(&self.state_table_ids),
648                    barrier_to_inject,
649                    None,
650                    mutation,
651                )?;
652            }
653        }
654        Ok(())
655    }
656
657    pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
658        self.status.update_progress(&resp.create_mview_progress);
659        self.barrier_control.collect(resp);
660        self.should_merge_to_upstream()
661    }
662
663    pub(super) fn should_merge_to_upstream(&self) -> bool {
664        if let CreatingStreamingJobStatus::ConsumingLogStore {
665            log_store_progress_tracker,
666            barriers_to_inject,
667            ..
668        } = &self.status
669            && barriers_to_inject.is_none()
670            && log_store_progress_tracker.is_finished()
671        {
672            true
673        } else {
674            false
675        }
676    }
677}
678
679pub(super) enum CompleteJobType {
680    /// The first barrier
681    First,
682    Normal,
683    /// The last barrier to complete
684    Finished,
685}
686
687impl CreatingStreamingJobControl {
688    pub(super) fn start_completing(
689        &mut self,
690        min_upstream_inflight_epoch: Option<u64>,
691    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
692        let (finished_at_epoch, epoch_end_bound) = match &self.status {
693            CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
694                let epoch_end_bound = min_upstream_inflight_epoch
695                    .map(|upstream_epoch| {
696                        if upstream_epoch < *finish_at_epoch {
697                            Excluded(upstream_epoch)
698                        } else {
699                            Unbounded
700                        }
701                    })
702                    .unwrap_or(Unbounded);
703                (Some(*finish_at_epoch), epoch_end_bound)
704            }
705            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
706            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
707                None,
708                min_upstream_inflight_epoch
709                    .map(Excluded)
710                    .unwrap_or(Unbounded),
711            ),
712            CreatingStreamingJobStatus::PlaceHolder => {
713                unreachable!()
714            }
715        };
716        self.barrier_control.start_completing(epoch_end_bound).map(
717            |(epoch, resps, is_first_commit)| {
718                let status = if let Some(finish_at_epoch) = finished_at_epoch {
719                    assert!(!is_first_commit);
720                    if epoch == finish_at_epoch {
721                        self.barrier_control.ack_completed(epoch);
722                        assert!(self.barrier_control.is_empty());
723                        CompleteJobType::Finished
724                    } else {
725                        CompleteJobType::Normal
726                    }
727                } else if is_first_commit {
728                    CompleteJobType::First
729                } else {
730                    CompleteJobType::Normal
731                };
732                (epoch, resps, status)
733            },
734        )
735    }
736
737    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
738        self.barrier_control.ack_completed(completed_epoch);
739    }
740
741    pub fn is_consuming(&self) -> bool {
742        match &self.status {
743            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
744            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
745            CreatingStreamingJobStatus::Finishing(..) => false,
746            CreatingStreamingJobStatus::PlaceHolder => {
747                unreachable!()
748            }
749        }
750    }
751
752    pub fn state_table_ids(&self) -> &HashSet<TableId> {
753        &self.state_table_ids
754    }
755
756    pub fn fragment_infos_with_job_id(
757        &self,
758    ) -> impl Iterator<Item = (&InflightFragmentInfo, JobId)> + '_ {
759        self.status
760            .fragment_infos()
761            .into_iter()
762            .flat_map(|fragments| fragments.values().map(|fragment| (fragment, self.job_id)))
763    }
764
765    pub fn into_tracking_job(self) -> TrackingJob {
766        assert!(self.barrier_control.is_empty());
767        match self.status {
768            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
769            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
770            | CreatingStreamingJobStatus::PlaceHolder => {
771                unreachable!("expect finish")
772            }
773            CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
774        }
775    }
776}