1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::{CreateType, WorkerId};
29use risingwave_pb::ddl_service::DdlProgress;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::id::{ActorId, FragmentId};
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_plan::{AddMutation, StopMutation};
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use status::CreatingStreamingJobStatus;
37use tracing::{debug, info};
38
39use crate::MetaResult;
40use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
41use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
42use crate::barrier::edge_builder::FragmentEdgeBuildResult;
43use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
44use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
45use crate::barrier::rpc::ControlStreamManager;
46use crate::barrier::{BackfillOrderState, BarrierKind, CreateStreamingJobCommandInfo, TracedEpoch};
47use crate::controller::fragment::InflightFragmentInfo;
48use crate::model::{FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate};
49use crate::rpc::metrics::GLOBAL_META_METRICS;
50use crate::stream::build_actor_connector_splits;
51
52#[derive(Debug)]
53pub(crate) struct CreatingJobInfo {
54 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
55 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
56 pub downstreams: FragmentDownstreamRelation,
57 pub snapshot_backfill_upstream_tables: HashSet<TableId>,
58 pub stream_actors: HashMap<ActorId, StreamActor>,
59}
60
61#[derive(Debug)]
62pub(crate) struct CreatingStreamingJobControl {
63 database_id: DatabaseId,
64 pub(super) job_id: JobId,
65 definition: String,
66 create_type: CreateType,
67 pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
68 snapshot_epoch: u64,
69
70 node_actors: HashMap<WorkerId, HashSet<ActorId>>,
71 state_table_ids: HashSet<TableId>,
72
73 barrier_control: CreatingStreamingJobBarrierControl,
74 status: CreatingStreamingJobStatus,
75
76 upstream_lag: LabelGuardedIntGauge,
77}
78
79impl CreatingStreamingJobControl {
80 pub(super) fn new(
81 info: &CreateStreamingJobCommandInfo,
82 snapshot_backfill_upstream_tables: HashSet<TableId>,
83 snapshot_epoch: u64,
84 version_stat: &HummockVersionStats,
85 control_stream_manager: &mut ControlStreamManager,
86 edges: &mut FragmentEdgeBuildResult,
87 ) -> MetaResult<Self> {
88 let job_id = info.stream_job_fragments.stream_job_id();
89 let database_id = info.streaming_job.database_id();
90 debug!(
91 %job_id,
92 definition = info.definition,
93 "new creating job"
94 );
95 let fragment_infos = info
96 .stream_job_fragments
97 .new_fragment_info(&info.init_split_assignment)
98 .collect();
99 let snapshot_backfill_actors =
100 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
101 let backfill_nodes_to_pause =
102 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
103 .into_iter()
104 .collect();
105 let backfill_order_state = BackfillOrderState::new(
106 &info.fragment_backfill_ordering,
107 &info.stream_job_fragments,
108 info.locality_fragment_state_table_mapping.clone(),
109 );
110 let create_mview_tracker = CreateMviewProgressTracker::recover(
111 job_id,
112 info.definition.clone(),
113 &fragment_infos,
114 backfill_order_state,
115 version_stat,
116 );
117
118 let actors_to_create =
119 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create().map(
120 |(fragment_id, node, actors)| {
121 (
122 fragment_id,
123 node,
124 actors,
125 [], )
127 },
128 ));
129
130 let mut barrier_control =
131 CreatingStreamingJobBarrierControl::new(job_id, snapshot_epoch, false, None);
132
133 let mut prev_epoch_fake_physical_time = 0;
134 let mut pending_non_checkpoint_barriers = vec![];
135
136 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
137 &mut prev_epoch_fake_physical_time,
138 &mut pending_non_checkpoint_barriers,
139 PbBarrierKind::Checkpoint,
140 );
141
142 let added_actors = info.stream_job_fragments.actor_ids().collect();
143 let actor_splits = info
144 .init_split_assignment
145 .values()
146 .flat_map(build_actor_connector_splits)
147 .collect();
148
149 assert!(
150 info.cdc_table_snapshot_splits.is_none(),
151 "should not have cdc backfill for snapshot backfill job"
152 );
153
154 let initial_mutation = Mutation::Add(AddMutation {
155 actor_dispatchers: Default::default(),
157 added_actors,
158 actor_splits,
159 pause: false,
161 subscriptions_to_add: Default::default(),
162 backfill_nodes_to_pause,
163 actor_cdc_table_snapshot_splits: None,
164 new_upstream_sinks: Default::default(),
165 });
166
167 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
168 let state_table_ids =
169 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
170
171 control_stream_manager.add_partial_graph(database_id, Some(job_id));
172 Self::inject_barrier(
173 database_id,
174 job_id,
175 control_stream_manager,
176 &mut barrier_control,
177 &node_actors,
178 Some(&state_table_ids),
179 initial_barrier_info,
180 Some(actors_to_create),
181 Some(initial_mutation),
182 )?;
183
184 assert!(pending_non_checkpoint_barriers.is_empty());
185
186 let job_info = CreatingJobInfo {
187 fragment_infos,
188 upstream_fragment_downstreams: info.upstream_fragment_downstreams.clone(),
189 downstreams: info.stream_job_fragments.downstreams.clone(),
190 snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
191 stream_actors: info
192 .stream_job_fragments
193 .fragments
194 .values()
195 .flat_map(|fragment| {
196 fragment
197 .actors
198 .iter()
199 .map(|actor| (actor.actor_id, actor.clone()))
200 })
201 .collect(),
202 };
203
204 Ok(Self {
205 database_id,
206 definition: info.definition.clone(),
207 create_type: info.create_type.into(),
208 job_id,
209 snapshot_backfill_upstream_tables,
210 barrier_control,
211 snapshot_epoch,
212 status: CreatingStreamingJobStatus::ConsumingSnapshot {
213 prev_epoch_fake_physical_time,
214 pending_upstream_barriers: vec![],
215 version_stats: version_stat.clone(),
216 create_mview_tracker,
217 snapshot_backfill_actors,
218 snapshot_epoch,
219 info: job_info,
220 pending_non_checkpoint_barriers,
221 },
222 upstream_lag: GLOBAL_META_METRICS
223 .snapshot_backfill_lag
224 .with_guarded_label_values(&[&format!("{}", job_id)]),
225 node_actors,
226 state_table_ids,
227 })
228 }
229
230 fn resolve_upstream_log_epochs(
231 snapshot_backfill_upstream_tables: &HashSet<TableId>,
232 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
233 exclusive_start_log_epoch: u64,
234 upstream_barrier_info: &BarrierInfo,
235 ) -> MetaResult<Vec<BarrierInfo>> {
236 let table_id = snapshot_backfill_upstream_tables
237 .iter()
238 .next()
239 .expect("snapshot backfill job should have upstream");
240 let epochs_iter = if let Some(epochs) = upstream_table_log_epochs.get(table_id) {
241 let mut epochs_iter = epochs.iter();
242 loop {
243 let (_, checkpoint_epoch) =
244 epochs_iter.next().expect("not reach committed epoch yet");
245 if *checkpoint_epoch < exclusive_start_log_epoch {
246 continue;
247 }
248 assert_eq!(*checkpoint_epoch, exclusive_start_log_epoch);
249 break;
250 }
251 epochs_iter
252 } else {
253 assert_eq!(
255 upstream_barrier_info.prev_epoch(),
256 exclusive_start_log_epoch
257 );
258 static EMPTY_VEC: Vec<(Vec<u64>, u64)> = Vec::new();
259 EMPTY_VEC.iter()
260 };
261
262 let mut ret = vec![];
263 let mut prev_epoch = exclusive_start_log_epoch;
264 let mut pending_non_checkpoint_barriers = vec![];
265 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
266 for (i, epoch) in non_checkpoint_epochs
267 .iter()
268 .chain([checkpoint_epoch])
269 .enumerate()
270 {
271 assert!(*epoch > prev_epoch);
272 pending_non_checkpoint_barriers.push(prev_epoch);
273 ret.push(BarrierInfo {
274 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
275 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
276 kind: if i == 0 {
277 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
278 } else {
279 BarrierKind::Barrier
280 },
281 });
282 prev_epoch = *epoch;
283 }
284 }
285 ret.push(BarrierInfo {
286 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
287 curr_epoch: TracedEpoch::new(Epoch(upstream_barrier_info.curr_epoch())),
288 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
289 });
290 Ok(ret)
291 }
292
293 fn recover_consuming_snapshot(
294 job_id: JobId,
295 definition: &String,
296 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
297 snapshot_epoch: u64,
298 committed_epoch: u64,
299 upstream_barrier_info: &BarrierInfo,
300 info: CreatingJobInfo,
301 version_stat: &HummockVersionStats,
302 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
303 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
304 let mut pending_non_checkpoint_barriers = vec![];
305 let create_mview_tracker = CreateMviewProgressTracker::recover(
306 job_id,
307 definition.clone(),
308 &info.fragment_infos,
309 Default::default(),
310 version_stat,
311 );
312 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
313 &mut prev_epoch_fake_physical_time,
314 &mut pending_non_checkpoint_barriers,
315 PbBarrierKind::Initial,
316 );
317
318 Ok((
319 CreatingStreamingJobStatus::ConsumingSnapshot {
320 prev_epoch_fake_physical_time,
321 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
322 &info.snapshot_backfill_upstream_tables,
323 upstream_table_log_epochs,
324 snapshot_epoch,
325 upstream_barrier_info,
326 )?,
327 version_stats: version_stat.clone(),
328 create_mview_tracker,
329 snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
330 &info.fragment_infos,
331 )
332 .collect(),
333 info,
334 snapshot_epoch,
335 pending_non_checkpoint_barriers,
336 },
337 barrier_info,
338 ))
339 }
340
341 fn recover_consuming_log_store(
342 job_id: JobId,
343 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
344 committed_epoch: u64,
345 upstream_barrier_info: &BarrierInfo,
346 info: CreatingJobInfo,
347 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
348 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
349 &info.snapshot_backfill_upstream_tables,
350 upstream_table_log_epochs,
351 committed_epoch,
352 upstream_barrier_info,
353 )?;
354 let mut first_barrier = barriers_to_inject.remove(0);
355 assert!(first_barrier.kind.is_checkpoint());
356 first_barrier.kind = BarrierKind::Initial;
357
358 Ok((
359 CreatingStreamingJobStatus::ConsumingLogStore {
360 tracking_job: TrackingJob::recovered(job_id, &info.fragment_infos),
361 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
362 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&info.fragment_infos),
363 barriers_to_inject
364 .last()
365 .map(|info| info.prev_epoch() - committed_epoch)
366 .unwrap_or(0),
367 ),
368 barriers_to_inject: Some(barriers_to_inject),
369 info,
370 },
371 first_barrier,
372 ))
373 }
374
375 #[expect(clippy::too_many_arguments)]
376 pub(crate) fn recover(
377 database_id: DatabaseId,
378 job_id: JobId,
379 definition: String,
380 snapshot_backfill_upstream_tables: HashSet<TableId>,
381 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
382 snapshot_epoch: u64,
383 committed_epoch: u64,
384 upstream_barrier_info: &BarrierInfo,
385 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
386 fragment_relations: &FragmentDownstreamRelation,
387 version_stat: &HummockVersionStats,
388 new_actors: StreamJobActorsToCreate,
389 initial_mutation: Mutation,
390 control_stream_manager: &mut ControlStreamManager,
391 ) -> MetaResult<Self> {
392 debug!(
393 %job_id,
394 definition,
395 "recovered creating job"
396 );
397 let mut barrier_control = CreatingStreamingJobBarrierControl::new(
398 job_id,
399 snapshot_epoch,
400 true,
401 Some(committed_epoch),
402 );
403
404 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
405 let state_table_ids =
406 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
407
408 let mut upstream_fragment_downstreams: FragmentDownstreamRelation = Default::default();
409 for (upstream_fragment_id, downstreams) in fragment_relations {
410 if fragment_infos.contains_key(upstream_fragment_id) {
411 continue;
412 }
413 for downstream in downstreams {
414 if fragment_infos.contains_key(&downstream.downstream_fragment_id) {
415 upstream_fragment_downstreams
416 .entry(*upstream_fragment_id)
417 .or_default()
418 .push(downstream.clone());
419 }
420 }
421 }
422 let downstreams = fragment_infos
423 .keys()
424 .filter_map(|fragment_id| {
425 fragment_relations
426 .get(fragment_id)
427 .map(|relation| (*fragment_id, relation.clone()))
428 })
429 .collect();
430
431 let info = CreatingJobInfo {
432 fragment_infos,
433 upstream_fragment_downstreams,
434 downstreams,
435 snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
436 stream_actors: new_actors
437 .values()
438 .flat_map(|fragments| {
439 fragments.values().flat_map(|(_, actors, _)| {
440 actors
441 .iter()
442 .map(|(actor, _, _)| (actor.actor_id, actor.clone()))
443 })
444 })
445 .collect(),
446 };
447
448 let (status, first_barrier_info) = if committed_epoch < snapshot_epoch {
449 Self::recover_consuming_snapshot(
450 job_id,
451 &definition,
452 upstream_table_log_epochs,
453 snapshot_epoch,
454 committed_epoch,
455 upstream_barrier_info,
456 info,
457 version_stat,
458 )?
459 } else {
460 Self::recover_consuming_log_store(
461 job_id,
462 upstream_table_log_epochs,
463 committed_epoch,
464 upstream_barrier_info,
465 info,
466 )?
467 };
468 control_stream_manager.add_partial_graph(database_id, Some(job_id));
469
470 Self::inject_barrier(
471 database_id,
472 job_id,
473 control_stream_manager,
474 &mut barrier_control,
475 &node_actors,
476 Some(&state_table_ids),
477 first_barrier_info,
478 Some(new_actors),
479 Some(initial_mutation),
480 )?;
481 Ok(Self {
482 database_id,
483 job_id,
484 definition,
485 create_type: CreateType::Background,
486 snapshot_backfill_upstream_tables,
487 snapshot_epoch,
488 node_actors,
489 state_table_ids,
490 barrier_control,
491 status,
492 upstream_lag: GLOBAL_META_METRICS
493 .snapshot_backfill_lag
494 .with_guarded_label_values(&[&format!("{}", job_id)]),
495 })
496 }
497
498 pub(crate) fn is_empty(&self) -> bool {
499 self.barrier_control.is_empty()
500 }
501
502 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
503 self.barrier_control.is_valid_after_worker_err(worker_id)
504 && self
505 .status
506 .fragment_infos()
507 .map(|fragment_infos| {
508 InflightFragmentInfo::contains_worker(fragment_infos.values(), worker_id)
509 })
510 .unwrap_or(true)
511 }
512
513 pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
514 let progress = match &self.status {
515 CreatingStreamingJobStatus::ConsumingSnapshot {
516 create_mview_tracker,
517 ..
518 } => {
519 if create_mview_tracker.is_finished() {
520 "Snapshot finished".to_owned()
521 } else {
522 let progress = create_mview_tracker.gen_ddl_progress();
523 format!("Snapshot [{}]", progress.progress)
524 }
525 }
526 CreatingStreamingJobStatus::ConsumingLogStore {
527 log_store_progress_tracker,
528 ..
529 } => {
530 format!(
531 "LogStore [{}]",
532 log_store_progress_tracker.gen_ddl_progress()
533 )
534 }
535 CreatingStreamingJobStatus::Finishing(..) => {
536 format!(
537 "Finishing [epoch count: {}]",
538 self.barrier_control.inflight_barrier_count()
539 )
540 }
541 CreatingStreamingJobStatus::PlaceHolder => {
542 unreachable!()
543 }
544 };
545 DdlProgress {
546 id: self.job_id.as_raw_id() as u64,
547 statement: self.definition.clone(),
548 create_type: self.create_type.as_str().to_owned(),
549 progress,
550 }
551 }
552
553 pub(super) fn pinned_upstream_log_epoch(&self) -> u64 {
554 max(
555 self.barrier_control.max_committed_epoch().unwrap_or(0),
556 self.snapshot_epoch,
557 )
558 }
559
560 fn inject_barrier(
561 database_id: DatabaseId,
562 job_id: JobId,
563 control_stream_manager: &mut ControlStreamManager,
564 barrier_control: &mut CreatingStreamingJobBarrierControl,
565 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
566 state_table_ids: Option<&HashSet<TableId>>,
567 barrier_info: BarrierInfo,
568 new_actors: Option<StreamJobActorsToCreate>,
569 mutation: Option<Mutation>,
570 ) -> MetaResult<()> {
571 let node_to_collect = control_stream_manager.inject_barrier(
572 database_id,
573 Some(job_id),
574 mutation,
575 &barrier_info,
576 node_actors,
577 state_table_ids.into_iter().flatten().copied(),
578 new_actors,
579 )?;
580 barrier_control.enqueue_epoch(
581 barrier_info.prev_epoch(),
582 node_to_collect,
583 barrier_info.kind.clone(),
584 );
585 Ok(())
586 }
587
588 pub(super) fn start_consume_upstream(
589 &mut self,
590 control_stream_manager: &mut ControlStreamManager,
591 barrier_info: &BarrierInfo,
592 ) -> MetaResult<CreatingJobInfo> {
593 info!(
594 job_id = %self.job_id,
595 prev_epoch = barrier_info.prev_epoch(),
596 "start consuming upstream"
597 );
598 let info = self.status.start_consume_upstream(barrier_info);
599 Self::inject_barrier(
600 self.database_id,
601 self.job_id,
602 control_stream_manager,
603 &mut self.barrier_control,
604 &self.node_actors,
605 None,
606 barrier_info.clone(),
607 None,
608 Some(Mutation::Stop(StopMutation {
609 actors: info
611 .fragment_infos
612 .values()
613 .flat_map(|info| info.actors.keys().copied())
614 .collect(),
615 dropped_sink_fragments: vec![], })),
617 )?;
618 Ok(info)
619 }
620
621 pub(super) fn on_new_upstream_barrier(
622 &mut self,
623 control_stream_manager: &mut ControlStreamManager,
624 barrier_info: &BarrierInfo,
625 ) -> MetaResult<()> {
626 let progress_epoch =
627 if let Some(max_committed_epoch) = self.barrier_control.max_committed_epoch() {
628 max(max_committed_epoch, self.snapshot_epoch)
629 } else {
630 self.snapshot_epoch
631 };
632 self.upstream_lag.set(
633 barrier_info
634 .prev_epoch
635 .value()
636 .0
637 .saturating_sub(progress_epoch) as _,
638 );
639 {
640 for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
641 Self::inject_barrier(
642 self.database_id,
643 self.job_id,
644 control_stream_manager,
645 &mut self.barrier_control,
646 &self.node_actors,
647 Some(&self.state_table_ids),
648 barrier_to_inject,
649 None,
650 mutation,
651 )?;
652 }
653 }
654 Ok(())
655 }
656
657 pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
658 self.status.update_progress(&resp.create_mview_progress);
659 self.barrier_control.collect(resp);
660 self.should_merge_to_upstream()
661 }
662
663 pub(super) fn should_merge_to_upstream(&self) -> bool {
664 if let CreatingStreamingJobStatus::ConsumingLogStore {
665 log_store_progress_tracker,
666 barriers_to_inject,
667 ..
668 } = &self.status
669 && barriers_to_inject.is_none()
670 && log_store_progress_tracker.is_finished()
671 {
672 true
673 } else {
674 false
675 }
676 }
677}
678
679pub(super) enum CompleteJobType {
680 First,
682 Normal,
683 Finished,
685}
686
687impl CreatingStreamingJobControl {
688 pub(super) fn start_completing(
689 &mut self,
690 min_upstream_inflight_epoch: Option<u64>,
691 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
692 let (finished_at_epoch, epoch_end_bound) = match &self.status {
693 CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
694 let epoch_end_bound = min_upstream_inflight_epoch
695 .map(|upstream_epoch| {
696 if upstream_epoch < *finish_at_epoch {
697 Excluded(upstream_epoch)
698 } else {
699 Unbounded
700 }
701 })
702 .unwrap_or(Unbounded);
703 (Some(*finish_at_epoch), epoch_end_bound)
704 }
705 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
706 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
707 None,
708 min_upstream_inflight_epoch
709 .map(Excluded)
710 .unwrap_or(Unbounded),
711 ),
712 CreatingStreamingJobStatus::PlaceHolder => {
713 unreachable!()
714 }
715 };
716 self.barrier_control.start_completing(epoch_end_bound).map(
717 |(epoch, resps, is_first_commit)| {
718 let status = if let Some(finish_at_epoch) = finished_at_epoch {
719 assert!(!is_first_commit);
720 if epoch == finish_at_epoch {
721 self.barrier_control.ack_completed(epoch);
722 assert!(self.barrier_control.is_empty());
723 CompleteJobType::Finished
724 } else {
725 CompleteJobType::Normal
726 }
727 } else if is_first_commit {
728 CompleteJobType::First
729 } else {
730 CompleteJobType::Normal
731 };
732 (epoch, resps, status)
733 },
734 )
735 }
736
737 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
738 self.barrier_control.ack_completed(completed_epoch);
739 }
740
741 pub fn is_consuming(&self) -> bool {
742 match &self.status {
743 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
744 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
745 CreatingStreamingJobStatus::Finishing(..) => false,
746 CreatingStreamingJobStatus::PlaceHolder => {
747 unreachable!()
748 }
749 }
750 }
751
752 pub fn state_table_ids(&self) -> &HashSet<TableId> {
753 &self.state_table_ids
754 }
755
756 pub fn fragment_infos_with_job_id(
757 &self,
758 ) -> impl Iterator<Item = (&InflightFragmentInfo, JobId)> + '_ {
759 self.status
760 .fragment_infos()
761 .into_iter()
762 .flat_map(|fragments| fragments.values().map(|fragment| (fragment, self.job_id)))
763 }
764
765 pub fn into_tracking_job(self) -> TrackingJob {
766 assert!(self.barrier_control.is_empty());
767 match self.status {
768 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
769 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
770 | CreatingStreamingJobStatus::PlaceHolder => {
771 unreachable!("expect finish")
772 }
773 CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
774 }
775 }
776}