risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::{CreateType, WorkerId};
29use risingwave_pb::ddl_service::DdlProgress;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::id::{ActorId, FragmentId};
32use risingwave_pb::stream_plan::AddMutation;
33use risingwave_pb::stream_plan::barrier::PbBarrierKind;
34use risingwave_pb::stream_plan::barrier_mutation::Mutation;
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use status::CreatingStreamingJobStatus;
37use tracing::{debug, info};
38
39use crate::MetaResult;
40use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
41use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
42use crate::barrier::edge_builder::FragmentEdgeBuildResult;
43use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
44use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
45use crate::barrier::rpc::ControlStreamManager;
46use crate::barrier::{BackfillOrderState, BarrierKind, CreateStreamingJobCommandInfo, TracedEpoch};
47use crate::controller::fragment::InflightFragmentInfo;
48use crate::model::StreamJobActorsToCreate;
49use crate::rpc::metrics::GLOBAL_META_METRICS;
50use crate::stream::build_actor_connector_splits;
51
52#[derive(Debug)]
53pub(crate) struct CreatingStreamingJobControl {
54    database_id: DatabaseId,
55    pub(super) job_id: JobId,
56    definition: String,
57    create_type: CreateType,
58    pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
59    backfill_epoch: u64,
60
61    node_actors: HashMap<WorkerId, HashSet<ActorId>>,
62    state_table_ids: HashSet<TableId>,
63
64    barrier_control: CreatingStreamingJobBarrierControl,
65    status: CreatingStreamingJobStatus,
66
67    upstream_lag: LabelGuardedIntGauge,
68}
69
70impl CreatingStreamingJobControl {
71    pub(super) fn new(
72        info: &CreateStreamingJobCommandInfo,
73        snapshot_backfill_upstream_tables: HashSet<TableId>,
74        backfill_epoch: u64,
75        version_stat: &HummockVersionStats,
76        control_stream_manager: &mut ControlStreamManager,
77        edges: &mut FragmentEdgeBuildResult,
78    ) -> MetaResult<Self> {
79        let job_id = info.stream_job_fragments.stream_job_id();
80        let database_id = info.streaming_job.database_id();
81        debug!(
82            %job_id,
83            definition = info.definition,
84            "new creating job"
85        );
86        let fragment_infos = info
87            .stream_job_fragments
88            .new_fragment_info(&info.init_split_assignment)
89            .collect();
90        let snapshot_backfill_actors =
91            InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
92        let backfill_nodes_to_pause =
93            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
94                .into_iter()
95                .collect();
96        let backfill_order_state = BackfillOrderState::new(
97            &info.fragment_backfill_ordering,
98            &info.stream_job_fragments,
99            info.locality_fragment_state_table_mapping.clone(),
100        );
101        let create_mview_tracker = CreateMviewProgressTracker::recover(
102            job_id,
103            info.definition.clone(),
104            &fragment_infos,
105            backfill_order_state,
106            version_stat,
107        );
108
109        let actors_to_create =
110            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create().map(
111                |(fragment_id, node, actors)| {
112                    (
113                        fragment_id,
114                        node,
115                        actors,
116                        [], // no subscribers for newly creating job
117                    )
118                },
119            ));
120
121        let mut barrier_control =
122            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
123
124        let mut prev_epoch_fake_physical_time = 0;
125        let mut pending_non_checkpoint_barriers = vec![];
126
127        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
128            &mut prev_epoch_fake_physical_time,
129            &mut pending_non_checkpoint_barriers,
130            PbBarrierKind::Checkpoint,
131        );
132
133        let added_actors = info.stream_job_fragments.actor_ids().collect();
134        let actor_splits = info
135            .init_split_assignment
136            .values()
137            .flat_map(build_actor_connector_splits)
138            .collect();
139
140        assert!(
141            info.cdc_table_snapshot_splits.is_none(),
142            "should not have cdc backfill for snapshot backfill job"
143        );
144
145        let initial_mutation = Mutation::Add(AddMutation {
146            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
147            actor_dispatchers: Default::default(),
148            added_actors,
149            actor_splits,
150            // we assume that when handling snapshot backfill, the cluster must not be paused
151            pause: false,
152            subscriptions_to_add: Default::default(),
153            backfill_nodes_to_pause,
154            actor_cdc_table_snapshot_splits: None,
155            new_upstream_sinks: Default::default(),
156        });
157
158        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
159        let state_table_ids =
160            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
161
162        control_stream_manager.add_partial_graph(database_id, Some(job_id));
163        Self::inject_barrier(
164            database_id,
165            job_id,
166            control_stream_manager,
167            &mut barrier_control,
168            &node_actors,
169            Some(&state_table_ids),
170            initial_barrier_info,
171            Some(actors_to_create),
172            Some(initial_mutation),
173        )?;
174
175        assert!(pending_non_checkpoint_barriers.is_empty());
176
177        Ok(Self {
178            database_id,
179            definition: info.definition.clone(),
180            create_type: info.create_type.into(),
181            job_id,
182            snapshot_backfill_upstream_tables,
183            barrier_control,
184            backfill_epoch,
185            status: CreatingStreamingJobStatus::ConsumingSnapshot {
186                prev_epoch_fake_physical_time,
187                pending_upstream_barriers: vec![],
188                version_stats: version_stat.clone(),
189                create_mview_tracker,
190                snapshot_backfill_actors,
191                backfill_epoch,
192                fragment_infos,
193                pending_non_checkpoint_barriers,
194            },
195            upstream_lag: GLOBAL_META_METRICS
196                .snapshot_backfill_lag
197                .with_guarded_label_values(&[&format!("{}", job_id)]),
198            node_actors,
199            state_table_ids,
200        })
201    }
202
203    fn resolve_upstream_log_epochs(
204        snapshot_backfill_upstream_tables: &HashSet<TableId>,
205        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
206        committed_epoch: u64,
207        upstream_curr_epoch: u64,
208    ) -> MetaResult<Vec<BarrierInfo>> {
209        // TODO: add sanity check
210        let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
211        let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
212        loop {
213            let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
214            if *checkpoint_epoch < committed_epoch {
215                continue;
216            }
217            assert_eq!(*checkpoint_epoch, committed_epoch);
218            break;
219        }
220        let mut ret = vec![];
221        let mut prev_epoch = committed_epoch;
222        let mut pending_non_checkpoint_barriers = vec![];
223        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
224            for (i, epoch) in non_checkpoint_epochs
225                .iter()
226                .chain([checkpoint_epoch])
227                .enumerate()
228            {
229                assert!(*epoch > prev_epoch);
230                pending_non_checkpoint_barriers.push(prev_epoch);
231                ret.push(BarrierInfo {
232                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
233                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
234                    kind: if i == 0 {
235                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
236                    } else {
237                        BarrierKind::Barrier
238                    },
239                });
240                prev_epoch = *epoch;
241            }
242        }
243        ret.push(BarrierInfo {
244            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
245            curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
246            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
247        });
248        Ok(ret)
249    }
250
251    fn recover_consuming_snapshot(
252        job_id: JobId,
253        definition: &String,
254        snapshot_backfill_upstream_tables: &HashSet<TableId>,
255        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
256        backfill_epoch: u64,
257        committed_epoch: u64,
258        upstream_curr_epoch: u64,
259        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
260        version_stat: &HummockVersionStats,
261    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
262        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
263        let mut pending_non_checkpoint_barriers = vec![];
264        let create_mview_tracker = CreateMviewProgressTracker::recover(
265            job_id,
266            definition.clone(),
267            &fragment_infos,
268            Default::default(),
269            version_stat,
270        );
271        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
272            &mut prev_epoch_fake_physical_time,
273            &mut pending_non_checkpoint_barriers,
274            PbBarrierKind::Initial,
275        );
276        Ok((
277            CreatingStreamingJobStatus::ConsumingSnapshot {
278                prev_epoch_fake_physical_time,
279                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
280                    snapshot_backfill_upstream_tables,
281                    upstream_table_log_epochs,
282                    backfill_epoch,
283                    upstream_curr_epoch,
284                )?,
285                version_stats: version_stat.clone(),
286                create_mview_tracker,
287                snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
288                    &fragment_infos,
289                )
290                .collect(),
291                fragment_infos,
292                backfill_epoch,
293                pending_non_checkpoint_barriers,
294            },
295            barrier_info,
296        ))
297    }
298
299    fn recover_consuming_log_store(
300        job_id: JobId,
301        snapshot_backfill_upstream_tables: &HashSet<TableId>,
302        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
303        committed_epoch: u64,
304        upstream_curr_epoch: u64,
305        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
306    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
307        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
308            snapshot_backfill_upstream_tables,
309            upstream_table_log_epochs,
310            committed_epoch,
311            upstream_curr_epoch,
312        )?;
313        let mut first_barrier = barriers_to_inject.remove(0);
314        assert!(first_barrier.kind.is_checkpoint());
315        first_barrier.kind = BarrierKind::Initial;
316        Ok((
317            CreatingStreamingJobStatus::ConsumingLogStore {
318                tracking_job: TrackingJob::recovered(job_id, &fragment_infos),
319                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
320                    InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos),
321                    barriers_to_inject
322                        .last()
323                        .map(|info| info.prev_epoch() - committed_epoch)
324                        .unwrap_or(0),
325                ),
326                barriers_to_inject: Some(barriers_to_inject),
327                fragment_infos,
328            },
329            first_barrier,
330        ))
331    }
332
333    #[expect(clippy::too_many_arguments)]
334    pub(crate) fn recover(
335        database_id: DatabaseId,
336        job_id: JobId,
337        definition: String,
338        snapshot_backfill_upstream_tables: HashSet<TableId>,
339        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
340        backfill_epoch: u64,
341        committed_epoch: u64,
342        upstream_curr_epoch: u64,
343        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
344        version_stat: &HummockVersionStats,
345        new_actors: StreamJobActorsToCreate,
346        initial_mutation: Mutation,
347        control_stream_manager: &mut ControlStreamManager,
348    ) -> MetaResult<Self> {
349        debug!(
350            %job_id,
351            definition,
352            "recovered creating job"
353        );
354        let mut barrier_control =
355            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
356
357        let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
358        let state_table_ids =
359            InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
360
361        let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
362            Self::recover_consuming_snapshot(
363                job_id,
364                &definition,
365                &snapshot_backfill_upstream_tables,
366                upstream_table_log_epochs,
367                backfill_epoch,
368                committed_epoch,
369                upstream_curr_epoch,
370                fragment_infos,
371                version_stat,
372            )?
373        } else {
374            Self::recover_consuming_log_store(
375                job_id,
376                &snapshot_backfill_upstream_tables,
377                upstream_table_log_epochs,
378                committed_epoch,
379                upstream_curr_epoch,
380                fragment_infos,
381            )?
382        };
383        control_stream_manager.add_partial_graph(database_id, Some(job_id));
384
385        Self::inject_barrier(
386            database_id,
387            job_id,
388            control_stream_manager,
389            &mut barrier_control,
390            &node_actors,
391            Some(&state_table_ids),
392            first_barrier_info,
393            Some(new_actors),
394            Some(initial_mutation),
395        )?;
396        Ok(Self {
397            database_id,
398            job_id,
399            definition,
400            create_type: CreateType::Background,
401            snapshot_backfill_upstream_tables,
402            backfill_epoch,
403            node_actors,
404            state_table_ids,
405            barrier_control,
406            status,
407            upstream_lag: GLOBAL_META_METRICS
408                .snapshot_backfill_lag
409                .with_guarded_label_values(&[&format!("{}", job_id)]),
410        })
411    }
412
413    pub(crate) fn is_empty(&self) -> bool {
414        self.barrier_control.is_empty()
415    }
416
417    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
418        self.barrier_control.is_valid_after_worker_err(worker_id)
419            && self
420                .status
421                .fragment_infos()
422                .map(|fragment_infos| {
423                    InflightFragmentInfo::contains_worker(fragment_infos.values(), worker_id)
424                })
425                .unwrap_or(true)
426    }
427
428    pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
429        let progress = match &self.status {
430            CreatingStreamingJobStatus::ConsumingSnapshot {
431                create_mview_tracker,
432                ..
433            } => {
434                if create_mview_tracker.is_finished() {
435                    "Snapshot finished".to_owned()
436                } else {
437                    let progress = create_mview_tracker.gen_ddl_progress();
438                    format!("Snapshot [{}]", progress.progress)
439                }
440            }
441            CreatingStreamingJobStatus::ConsumingLogStore {
442                log_store_progress_tracker,
443                ..
444            } => {
445                format!(
446                    "LogStore [{}]",
447                    log_store_progress_tracker.gen_ddl_progress()
448                )
449            }
450            CreatingStreamingJobStatus::Finishing(..) => {
451                format!(
452                    "Finishing [epoch count: {}]",
453                    self.barrier_control.inflight_barrier_count()
454                )
455            }
456            CreatingStreamingJobStatus::PlaceHolder => {
457                unreachable!()
458            }
459        };
460        DdlProgress {
461            id: self.job_id.as_raw_id() as u64,
462            statement: self.definition.clone(),
463            create_type: self.create_type.as_str().to_owned(),
464            progress,
465        }
466    }
467
468    pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
469        if self.status.is_finishing() {
470            None
471        } else {
472            // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed
473            Some(max(
474                self.barrier_control.max_collected_epoch().unwrap_or(0),
475                self.backfill_epoch,
476            ))
477        }
478    }
479
480    fn inject_barrier(
481        database_id: DatabaseId,
482        job_id: JobId,
483        control_stream_manager: &mut ControlStreamManager,
484        barrier_control: &mut CreatingStreamingJobBarrierControl,
485        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
486        state_table_ids: Option<&HashSet<TableId>>,
487        barrier_info: BarrierInfo,
488        new_actors: Option<StreamJobActorsToCreate>,
489        mutation: Option<Mutation>,
490    ) -> MetaResult<()> {
491        let node_to_collect = control_stream_manager.inject_barrier(
492            database_id,
493            Some(job_id),
494            mutation,
495            &barrier_info,
496            node_actors,
497            state_table_ids.into_iter().flatten().copied(),
498            new_actors,
499        )?;
500        barrier_control.enqueue_epoch(
501            barrier_info.prev_epoch(),
502            node_to_collect,
503            barrier_info.kind.clone(),
504        );
505        Ok(())
506    }
507
508    pub(super) fn start_consume_upstream(
509        &mut self,
510        control_stream_manager: &mut ControlStreamManager,
511        barrier_info: &BarrierInfo,
512    ) -> MetaResult<HashMap<FragmentId, InflightFragmentInfo>> {
513        info!(
514            job_id = %self.job_id,
515            prev_epoch = barrier_info.prev_epoch(),
516            "start consuming upstream"
517        );
518        let fragment_infos = self.status.start_consume_upstream(barrier_info);
519        Self::inject_barrier(
520            self.database_id,
521            self.job_id,
522            control_stream_manager,
523            &mut self.barrier_control,
524            &self.node_actors,
525            None,
526            barrier_info.clone(),
527            None,
528            None,
529        )?;
530        Ok(fragment_infos)
531    }
532
533    pub(super) fn on_new_upstream_barrier(
534        &mut self,
535        control_stream_manager: &mut ControlStreamManager,
536        barrier_info: &BarrierInfo,
537    ) -> MetaResult<()> {
538        let progress_epoch =
539            if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
540                max(max_collected_epoch, self.backfill_epoch)
541            } else {
542                self.backfill_epoch
543            };
544        self.upstream_lag.set(
545            barrier_info
546                .prev_epoch
547                .value()
548                .0
549                .saturating_sub(progress_epoch) as _,
550        );
551        {
552            for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
553                Self::inject_barrier(
554                    self.database_id,
555                    self.job_id,
556                    control_stream_manager,
557                    &mut self.barrier_control,
558                    &self.node_actors,
559                    Some(&self.state_table_ids),
560                    barrier_to_inject,
561                    None,
562                    mutation,
563                )?;
564            }
565        }
566        Ok(())
567    }
568
569    pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
570        self.status.update_progress(&resp.create_mview_progress);
571        self.barrier_control.collect(resp);
572        self.should_merge_to_upstream().is_some()
573    }
574
575    pub(super) fn should_merge_to_upstream(&self) -> Option<&HashSet<TableId>> {
576        if let CreatingStreamingJobStatus::ConsumingLogStore {
577            log_store_progress_tracker,
578            barriers_to_inject,
579            ..
580        } = &self.status
581            && barriers_to_inject.is_none()
582            && log_store_progress_tracker.is_finished()
583        {
584            Some(&self.snapshot_backfill_upstream_tables)
585        } else {
586            None
587        }
588    }
589}
590
591pub(super) enum CompleteJobType {
592    /// The first barrier
593    First,
594    Normal,
595    /// The last barrier to complete
596    Finished,
597}
598
599impl CreatingStreamingJobControl {
600    pub(super) fn start_completing(
601        &mut self,
602        min_upstream_inflight_epoch: Option<u64>,
603    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
604        let (finished_at_epoch, epoch_end_bound) = match &self.status {
605            CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
606                let epoch_end_bound = min_upstream_inflight_epoch
607                    .map(|upstream_epoch| {
608                        if upstream_epoch < *finish_at_epoch {
609                            Excluded(upstream_epoch)
610                        } else {
611                            Unbounded
612                        }
613                    })
614                    .unwrap_or(Unbounded);
615                (Some(*finish_at_epoch), epoch_end_bound)
616            }
617            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
618            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
619                None,
620                min_upstream_inflight_epoch
621                    .map(Excluded)
622                    .unwrap_or(Unbounded),
623            ),
624            CreatingStreamingJobStatus::PlaceHolder => {
625                unreachable!()
626            }
627        };
628        self.barrier_control.start_completing(epoch_end_bound).map(
629            |(epoch, resps, is_first_commit)| {
630                let status = if let Some(finish_at_epoch) = finished_at_epoch {
631                    assert!(!is_first_commit);
632                    if epoch == finish_at_epoch {
633                        self.barrier_control.ack_completed(epoch);
634                        assert!(self.barrier_control.is_empty());
635                        CompleteJobType::Finished
636                    } else {
637                        CompleteJobType::Normal
638                    }
639                } else if is_first_commit {
640                    CompleteJobType::First
641                } else {
642                    CompleteJobType::Normal
643                };
644                (epoch, resps, status)
645            },
646        )
647    }
648
649    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
650        self.barrier_control.ack_completed(completed_epoch);
651    }
652
653    pub fn is_consuming(&self) -> bool {
654        match &self.status {
655            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
656            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
657            CreatingStreamingJobStatus::Finishing(..) => false,
658            CreatingStreamingJobStatus::PlaceHolder => {
659                unreachable!()
660            }
661        }
662    }
663
664    pub fn state_table_ids(&self) -> &HashSet<TableId> {
665        &self.state_table_ids
666    }
667
668    pub fn fragment_infos_with_job_id(
669        &self,
670    ) -> impl Iterator<Item = (&InflightFragmentInfo, JobId)> + '_ {
671        self.status
672            .fragment_infos()
673            .into_iter()
674            .flat_map(|fragments| fragments.values().map(|fragment| (fragment, self.job_id)))
675    }
676
677    pub fn into_tracking_job(self) -> TrackingJob {
678        assert!(self.barrier_control.is_empty());
679        match self.status {
680            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
681            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
682            | CreatingStreamingJobStatus::PlaceHolder => {
683                unreachable!("expect finish")
684            }
685            CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
686        }
687    }
688}