1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet, hash_map};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22use std::time::Duration;
23
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::WorkerId;
29use risingwave_pb::ddl_service::PbBackfillType;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_plan::{AddMutation, StopMutation};
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use status::CreatingStreamingJobStatus;
37use tracing::{debug, info};
38
39use super::super::state::RenderResult;
40use super::IndependentCheckpointJobControl;
41use crate::MetaResult;
42use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
43use crate::barrier::checkpoint::independent_job::creating_job::barrier_control::CreatingStreamingJobBarrierStats;
44use crate::barrier::checkpoint::independent_job::creating_job::status::CreateMviewLogStoreProgressTracker;
45use crate::barrier::command::PostCollectCommand;
46use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
47use crate::barrier::edge_builder::FragmentEdgeBuildResult;
48use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
49use crate::barrier::notifier::Notifier;
50use crate::barrier::partial_graph::{
51 CollectedBarrier, PartialGraphBarrierInfo, PartialGraphManager, PartialGraphRecoverer,
52};
53use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
54use crate::barrier::rpc::{build_locality_fragment_state_table_mapping, to_partial_graph_id};
55use crate::barrier::{
56 BackfillOrderState, BackfillProgress, BarrierKind, Command, FragmentBackfillProgress,
57 TracedEpoch,
58};
59use crate::controller::fragment::InflightFragmentInfo;
60use crate::model::{FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate};
61use crate::rpc::metrics::GLOBAL_META_METRICS;
62use crate::stream::source_manager::SplitAssignment;
63use crate::stream::{ExtendedFragmentBackfillOrder, build_actor_connector_splits};
64
65#[derive(Debug)]
66pub(crate) struct CreatingJobInfo {
67 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
68 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
69 pub downstreams: FragmentDownstreamRelation,
70 pub snapshot_backfill_upstream_tables: HashSet<TableId>,
71 pub stream_actors: HashMap<ActorId, StreamActor>,
72}
73
74#[derive(Debug)]
75pub(crate) struct CreatingStreamingJobControl {
76 job_id: JobId,
77 partial_graph_id: PartialGraphId,
78 snapshot_backfill_upstream_tables: HashSet<TableId>,
79 snapshot_epoch: u64,
80
81 node_actors: HashMap<WorkerId, HashSet<ActorId>>,
82 state_table_ids: HashSet<TableId>,
83
84 max_committed_epoch: Option<u64>,
85 status: CreatingStreamingJobStatus,
86
87 upstream_lag: LabelGuardedIntGauge,
88}
89
90impl CreatingStreamingJobControl {
91 pub(crate) fn new<'a>(
92 entry: hash_map::VacantEntry<'a, JobId, IndependentCheckpointJobControl>,
93 create_info: CreateSnapshotBackfillJobCommandInfo,
94 notifiers: Vec<Notifier>,
95 snapshot_backfill_upstream_tables: HashSet<TableId>,
96 snapshot_epoch: u64,
97 version_stat: &HummockVersionStats,
98 partial_graph_manager: &mut PartialGraphManager,
99 edges: &mut FragmentEdgeBuildResult,
100 split_assignment: &SplitAssignment,
101 actors: &RenderResult,
102 ) -> MetaResult<&'a mut Self> {
103 let info = create_info.info.clone();
104 let job_id = info.stream_job_fragments.stream_job_id();
105 let database_id = info.streaming_job.database_id();
106 debug!(
107 %job_id,
108 definition = info.definition,
109 "new creating job"
110 );
111 let fragment_infos = info
112 .stream_job_fragments
113 .new_fragment_info(
114 &actors.stream_actors,
115 &actors.actor_location,
116 split_assignment,
117 )
118 .collect();
119 let snapshot_backfill_actors =
120 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
121 let backfill_nodes_to_pause =
122 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
123 .into_iter()
124 .collect();
125 let backfill_order_state = BackfillOrderState::new(
126 &info.fragment_backfill_ordering,
127 &fragment_infos,
128 info.locality_fragment_state_table_mapping.clone(),
129 );
130 let create_mview_tracker = CreateMviewProgressTracker::recover(
131 job_id,
132 &fragment_infos,
133 backfill_order_state,
134 version_stat,
135 );
136
137 let actors_to_create = Command::create_streaming_job_actors_to_create(
138 &info,
139 edges,
140 &actors.stream_actors,
141 &actors.actor_location,
142 );
143
144 let mut prev_epoch_fake_physical_time = 0;
145 let mut pending_non_checkpoint_barriers = vec![];
146
147 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
148 &mut prev_epoch_fake_physical_time,
149 &mut pending_non_checkpoint_barriers,
150 PbBarrierKind::Checkpoint,
151 );
152
153 let added_actors: Vec<ActorId> = actors
154 .stream_actors
155 .values()
156 .flatten()
157 .map(|actor| actor.actor_id)
158 .collect();
159 let actor_splits = split_assignment
160 .values()
161 .flat_map(build_actor_connector_splits)
162 .collect();
163
164 assert!(
165 info.cdc_table_snapshot_splits.is_none(),
166 "should not have cdc backfill for snapshot backfill job"
167 );
168
169 let initial_mutation = Mutation::Add(AddMutation {
170 actor_dispatchers: Default::default(),
172 added_actors,
173 actor_splits,
174 pause: false,
176 subscriptions_to_add: Default::default(),
177 backfill_nodes_to_pause,
178 actor_cdc_table_snapshot_splits: None,
179 new_upstream_sinks: Default::default(),
180 });
181
182 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
183 let state_table_ids =
184 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
185
186 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
187
188 let IndependentCheckpointJobControl::CreatingStreamingJob(job) = entry.insert(
189 IndependentCheckpointJobControl::CreatingStreamingJob(Self {
190 partial_graph_id,
191 job_id,
192 snapshot_backfill_upstream_tables,
193 max_committed_epoch: None,
194 snapshot_epoch,
195 status: CreatingStreamingJobStatus::PlaceHolder, upstream_lag: GLOBAL_META_METRICS
197 .snapshot_backfill_lag
198 .with_guarded_label_values(&[&format!("{}", job_id)]),
199 node_actors,
200 state_table_ids,
201 }),
202 ) else {
203 unreachable!()
204 };
205
206 let mut graph_adder = partial_graph_manager.add_partial_graph(
207 partial_graph_id,
208 CreatingStreamingJobBarrierStats::new(job_id, snapshot_epoch),
209 );
210
211 if let Err(e) = Self::inject_barrier(
212 partial_graph_id,
213 graph_adder.manager(),
214 &job.node_actors,
215 &job.state_table_ids,
216 false,
217 initial_barrier_info,
218 Some(actors_to_create),
219 Some(initial_mutation),
220 notifiers,
221 Some(create_info),
222 ) {
223 graph_adder.failed();
224 job.status = CreatingStreamingJobStatus::Resetting(vec![]);
225 Err(e)
226 } else {
227 graph_adder.added();
228 assert!(pending_non_checkpoint_barriers.is_empty());
229 let job_info = CreatingJobInfo {
230 fragment_infos,
231 upstream_fragment_downstreams: info.upstream_fragment_downstreams.clone(),
232 downstreams: info.stream_job_fragments.downstreams,
233 snapshot_backfill_upstream_tables: job.snapshot_backfill_upstream_tables.clone(),
234 stream_actors: actors
235 .stream_actors
236 .values()
237 .flatten()
238 .map(|actor| (actor.actor_id, actor.clone()))
239 .collect(),
240 };
241 job.status = CreatingStreamingJobStatus::ConsumingSnapshot {
242 prev_epoch_fake_physical_time,
243 pending_upstream_barriers: vec![],
244 version_stats: version_stat.clone(),
245 create_mview_tracker,
246 snapshot_backfill_actors,
247 snapshot_epoch,
248 info: job_info,
249 pending_non_checkpoint_barriers,
250 };
251 Ok(job)
252 }
253 }
254
255 pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
256 match &self.status {
257 CreatingStreamingJobStatus::ConsumingSnapshot {
258 create_mview_tracker,
259 info,
260 ..
261 } => create_mview_tracker.collect_fragment_progress(&info.fragment_infos, true),
262 CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
263 collect_done_fragments(self.job_id, &info.fragment_infos)
264 }
265 CreatingStreamingJobStatus::Finishing(_, _)
266 | CreatingStreamingJobStatus::Resetting(_)
267 | CreatingStreamingJobStatus::PlaceHolder => vec![],
268 }
269 }
270
271 fn resolve_upstream_log_epochs(
272 snapshot_backfill_upstream_tables: &HashSet<TableId>,
273 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
274 exclusive_start_log_epoch: u64,
275 upstream_barrier_info: &BarrierInfo,
276 ) -> MetaResult<Vec<BarrierInfo>> {
277 let table_id = snapshot_backfill_upstream_tables
278 .iter()
279 .next()
280 .expect("snapshot backfill job should have upstream");
281 let epochs_iter = if let Some(epochs) = upstream_table_log_epochs.get(table_id) {
282 let mut epochs_iter = epochs.iter();
283 loop {
284 let (_, checkpoint_epoch) =
285 epochs_iter.next().expect("not reach committed epoch yet");
286 if *checkpoint_epoch < exclusive_start_log_epoch {
287 continue;
288 }
289 assert_eq!(*checkpoint_epoch, exclusive_start_log_epoch);
290 break;
291 }
292 epochs_iter
293 } else {
294 assert_eq!(
296 upstream_barrier_info.prev_epoch(),
297 exclusive_start_log_epoch
298 );
299 static EMPTY_VEC: Vec<(Vec<u64>, u64)> = Vec::new();
300 EMPTY_VEC.iter()
301 };
302
303 let mut ret = vec![];
304 let mut prev_epoch = exclusive_start_log_epoch;
305 let mut pending_non_checkpoint_barriers = vec![];
306 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
307 for (i, epoch) in non_checkpoint_epochs
308 .iter()
309 .chain([checkpoint_epoch])
310 .enumerate()
311 {
312 assert!(*epoch > prev_epoch);
313 pending_non_checkpoint_barriers.push(prev_epoch);
314 ret.push(BarrierInfo {
315 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
316 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
317 kind: if i == 0 {
318 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
319 } else {
320 BarrierKind::Barrier
321 },
322 });
323 prev_epoch = *epoch;
324 }
325 }
326 ret.push(BarrierInfo {
327 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
328 curr_epoch: TracedEpoch::new(Epoch(upstream_barrier_info.curr_epoch())),
329 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
330 });
331 Ok(ret)
332 }
333
334 fn recover_consuming_snapshot(
335 job_id: JobId,
336 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
337 snapshot_epoch: u64,
338 committed_epoch: u64,
339 upstream_barrier_info: &BarrierInfo,
340 info: CreatingJobInfo,
341 backfill_order_state: BackfillOrderState,
342 version_stat: &HummockVersionStats,
343 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
344 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
345 let mut pending_non_checkpoint_barriers = vec![];
346 let create_mview_tracker = CreateMviewProgressTracker::recover(
347 job_id,
348 &info.fragment_infos,
349 backfill_order_state,
350 version_stat,
351 );
352 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
353 &mut prev_epoch_fake_physical_time,
354 &mut pending_non_checkpoint_barriers,
355 PbBarrierKind::Initial,
356 );
357
358 Ok((
359 CreatingStreamingJobStatus::ConsumingSnapshot {
360 prev_epoch_fake_physical_time,
361 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
362 &info.snapshot_backfill_upstream_tables,
363 upstream_table_log_epochs,
364 snapshot_epoch,
365 upstream_barrier_info,
366 )?,
367 version_stats: version_stat.clone(),
368 create_mview_tracker,
369 snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
370 &info.fragment_infos,
371 )
372 .collect(),
373 info,
374 snapshot_epoch,
375 pending_non_checkpoint_barriers,
376 },
377 barrier_info,
378 ))
379 }
380
381 fn recover_consuming_log_store(
382 job_id: JobId,
383 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
384 committed_epoch: u64,
385 upstream_barrier_info: &BarrierInfo,
386 info: CreatingJobInfo,
387 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
388 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
389 &info.snapshot_backfill_upstream_tables,
390 upstream_table_log_epochs,
391 committed_epoch,
392 upstream_barrier_info,
393 )?;
394 let mut first_barrier = barriers_to_inject.remove(0);
395 assert!(first_barrier.kind.is_checkpoint());
396 first_barrier.kind = BarrierKind::Initial;
397
398 Ok((
399 CreatingStreamingJobStatus::ConsumingLogStore {
400 tracking_job: TrackingJob::recovered(job_id, &info.fragment_infos),
401 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
402 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&info.fragment_infos),
403 barriers_to_inject
404 .last()
405 .map(|info| info.prev_epoch() - committed_epoch)
406 .unwrap_or(0),
407 ),
408 barriers_to_inject: Some(barriers_to_inject),
409 info,
410 },
411 first_barrier,
412 ))
413 }
414
415 #[expect(clippy::too_many_arguments)]
416 pub(crate) fn recover(
417 database_id: DatabaseId,
418 job_id: JobId,
419 snapshot_backfill_upstream_tables: HashSet<TableId>,
420 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
421 snapshot_epoch: u64,
422 committed_epoch: u64,
423 upstream_barrier_info: &BarrierInfo,
424 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
425 backfill_order: ExtendedFragmentBackfillOrder,
426 fragment_relations: &FragmentDownstreamRelation,
427 version_stat: &HummockVersionStats,
428 new_actors: StreamJobActorsToCreate,
429 initial_mutation: Mutation,
430 partial_graph_recoverer: &mut PartialGraphRecoverer<'_>,
431 ) -> MetaResult<Self> {
432 info!(
433 %job_id,
434 "recovered creating snapshot backfill job"
435 );
436
437 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
438 let state_table_ids: HashSet<_> =
439 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
440
441 let mut upstream_fragment_downstreams: FragmentDownstreamRelation = Default::default();
442 for (upstream_fragment_id, downstreams) in fragment_relations {
443 if fragment_infos.contains_key(upstream_fragment_id) {
444 continue;
445 }
446 for downstream in downstreams {
447 if fragment_infos.contains_key(&downstream.downstream_fragment_id) {
448 upstream_fragment_downstreams
449 .entry(*upstream_fragment_id)
450 .or_default()
451 .push(downstream.clone());
452 }
453 }
454 }
455 let downstreams = fragment_infos
456 .keys()
457 .filter_map(|fragment_id| {
458 fragment_relations
459 .get(fragment_id)
460 .map(|relation| (*fragment_id, relation.clone()))
461 })
462 .collect();
463
464 let info = CreatingJobInfo {
465 fragment_infos,
466 upstream_fragment_downstreams,
467 downstreams,
468 snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
469 stream_actors: new_actors
470 .values()
471 .flat_map(|fragments| {
472 fragments.values().flat_map(|(_, actors, _)| {
473 actors
474 .iter()
475 .map(|(actor, _, _)| (actor.actor_id, actor.clone()))
476 })
477 })
478 .collect(),
479 };
480
481 let (status, first_barrier_info) = if committed_epoch < snapshot_epoch {
482 let locality_fragment_state_table_mapping =
483 build_locality_fragment_state_table_mapping(&info.fragment_infos);
484 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
485 &backfill_order,
486 &info.fragment_infos,
487 locality_fragment_state_table_mapping,
488 );
489 Self::recover_consuming_snapshot(
490 job_id,
491 upstream_table_log_epochs,
492 snapshot_epoch,
493 committed_epoch,
494 upstream_barrier_info,
495 info,
496 backfill_order_state,
497 version_stat,
498 )?
499 } else {
500 Self::recover_consuming_log_store(
501 job_id,
502 upstream_table_log_epochs,
503 committed_epoch,
504 upstream_barrier_info,
505 info,
506 )?
507 };
508
509 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
510
511 partial_graph_recoverer.recover_graph(
512 partial_graph_id,
513 initial_mutation,
514 &first_barrier_info,
515 &node_actors,
516 state_table_ids.iter().copied(),
517 new_actors,
518 CreatingStreamingJobBarrierStats::new(job_id, snapshot_epoch),
519 )?;
520
521 Ok(Self {
522 job_id,
523 partial_graph_id,
524 snapshot_backfill_upstream_tables,
525 snapshot_epoch,
526 node_actors,
527 state_table_ids,
528 max_committed_epoch: Some(committed_epoch),
529 status,
530 upstream_lag: GLOBAL_META_METRICS
531 .snapshot_backfill_lag
532 .with_guarded_label_values(&[&format!("{}", job_id)]),
533 })
534 }
535
536 pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
537 let progress = match &self.status {
538 CreatingStreamingJobStatus::ConsumingSnapshot {
539 create_mview_tracker,
540 ..
541 } => {
542 if create_mview_tracker.is_finished() {
543 "Snapshot finished".to_owned()
544 } else {
545 let progress = create_mview_tracker.gen_backfill_progress();
546 format!("Snapshot [{}]", progress)
547 }
548 }
549 CreatingStreamingJobStatus::ConsumingLogStore {
550 log_store_progress_tracker,
551 ..
552 } => {
553 format!(
554 "LogStore [{}]",
555 log_store_progress_tracker.gen_backfill_progress()
556 )
557 }
558 CreatingStreamingJobStatus::Finishing(finish_epoch, ..) => {
559 let committed_epoch = self.max_committed_epoch.expect("should have committed");
560 let lag = Duration::from_millis(
561 Epoch(*finish_epoch).physical_time() - Epoch(committed_epoch).physical_time(),
562 );
563 format!("Finishing [epoch lag: {lag:?}]",)
564 }
565 CreatingStreamingJobStatus::Resetting(_) => "Resetting".to_owned(),
566 CreatingStreamingJobStatus::PlaceHolder => {
567 unreachable!()
568 }
569 };
570 BackfillProgress {
571 progress,
572 backfill_type: PbBackfillType::SnapshotBackfill,
573 }
574 }
575
576 pub(super) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
577 (
578 max(self.max_committed_epoch.unwrap_or(0), self.snapshot_epoch),
579 self.snapshot_backfill_upstream_tables.clone(),
580 )
581 }
582
583 fn inject_barrier(
584 partial_graph_id: PartialGraphId,
585 partial_graph_manager: &mut PartialGraphManager,
586 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
587 state_table_ids: &HashSet<TableId>,
588 is_finishing: bool,
589 barrier_info: BarrierInfo,
590 new_actors: Option<StreamJobActorsToCreate>,
591 mutation: Option<Mutation>,
592 notifiers: Vec<Notifier>,
593 first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
594 ) -> MetaResult<()> {
595 let (table_ids_to_sync, nodes_to_sync_table) = if !is_finishing {
596 (Some(state_table_ids), Some(node_actors.keys().copied()))
597 } else {
598 (None, None)
599 };
600 partial_graph_manager.inject_barrier(
601 partial_graph_id,
602 mutation,
603 node_actors,
604 table_ids_to_sync.into_iter().flatten().copied(),
605 nodes_to_sync_table.into_iter().flatten(),
606 new_actors,
607 PartialGraphBarrierInfo::new(
608 first_create_info.map_or_else(
609 PostCollectCommand::barrier,
610 CreateSnapshotBackfillJobCommandInfo::into_post_collect,
611 ),
612 barrier_info,
613 notifiers,
614 state_table_ids.clone(),
615 ),
616 )?;
617 Ok(())
618 }
619
620 pub(crate) fn start_consume_upstream(
621 &mut self,
622 partial_graph_manager: &mut PartialGraphManager,
623 barrier_info: &BarrierInfo,
624 ) -> MetaResult<CreatingJobInfo> {
625 info!(
626 job_id = %self.job_id,
627 prev_epoch = barrier_info.prev_epoch(),
628 "start consuming upstream"
629 );
630 let info = self.status.start_consume_upstream(barrier_info);
631 Self::inject_barrier(
632 self.partial_graph_id,
633 partial_graph_manager,
634 &self.node_actors,
635 &self.state_table_ids,
636 true,
637 barrier_info.clone(),
638 None,
639 Some(Mutation::Stop(StopMutation {
640 actors: info
642 .fragment_infos
643 .values()
644 .flat_map(|info| info.actors.keys().copied())
645 .collect(),
646 dropped_sink_fragments: vec![], })),
648 vec![], None,
650 )?;
651 Ok(info)
652 }
653
654 pub(crate) fn on_new_upstream_barrier(
655 &mut self,
656 partial_graph_manager: &mut PartialGraphManager,
657 barrier_info: &BarrierInfo,
658 mutation: Option<(Mutation, Vec<Notifier>)>,
659 ) -> MetaResult<()> {
660 let progress_epoch = if let Some(max_committed_epoch) = self.max_committed_epoch {
661 max(max_committed_epoch, self.snapshot_epoch)
662 } else {
663 self.snapshot_epoch
664 };
665 self.upstream_lag.set(
666 barrier_info
667 .prev_epoch
668 .value()
669 .0
670 .saturating_sub(progress_epoch) as _,
671 );
672 let (mut mutation, mut notifiers) = match mutation {
673 Some((mutation, notifiers)) => (Some(mutation), notifiers),
674 None => (None, vec![]),
675 };
676 {
677 for (barrier_to_inject, mutation) in self
678 .status
679 .on_new_upstream_epoch(barrier_info, mutation.take())
680 {
681 Self::inject_barrier(
682 self.partial_graph_id,
683 partial_graph_manager,
684 &self.node_actors,
685 &self.state_table_ids,
686 false,
687 barrier_to_inject,
688 None,
689 mutation,
690 take(&mut notifiers),
691 None,
692 )?;
693 }
694 assert!(mutation.is_none(), "must have consumed mutation");
695 assert!(notifiers.is_empty(), "must consumed notifiers");
696 }
697 Ok(())
698 }
699
700 pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
701 self.status.update_progress(
702 collected_barrier
703 .resps
704 .values()
705 .flat_map(|resp| &resp.create_mview_progress),
706 );
707 self.should_merge_to_upstream()
708 }
709
710 pub(crate) fn should_merge_to_upstream(&self) -> bool {
711 if let CreatingStreamingJobStatus::ConsumingLogStore {
712 log_store_progress_tracker,
713 barriers_to_inject,
714 ..
715 } = &self.status
716 && barriers_to_inject.is_none()
717 && log_store_progress_tracker.is_finished()
718 {
719 true
720 } else {
721 false
722 }
723 }
724}
725
726impl CreatingStreamingJobControl {
727 pub(crate) fn start_completing(
728 &mut self,
729 partial_graph_manager: &mut PartialGraphManager,
730 min_upstream_inflight_epoch: Option<u64>,
731 upstream_committed_epoch: u64,
732 ) -> Option<(
733 u64,
734 HashMap<WorkerId, BarrierCompleteResponse>,
735 PartialGraphBarrierInfo,
736 bool,
737 )> {
738 if upstream_committed_epoch < self.snapshot_epoch {
740 return None;
741 }
742 let (finished_at_epoch, epoch_end_bound) = match &self.status {
743 CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
744 let epoch_end_bound = min_upstream_inflight_epoch
745 .map(|upstream_epoch| {
746 if upstream_epoch < *finish_at_epoch {
747 Excluded(upstream_epoch)
748 } else {
749 Unbounded
750 }
751 })
752 .unwrap_or(Unbounded);
753 (Some(*finish_at_epoch), epoch_end_bound)
754 }
755 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
756 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
757 None,
758 min_upstream_inflight_epoch
759 .map(Excluded)
760 .unwrap_or(Unbounded),
761 ),
762 CreatingStreamingJobStatus::Resetting(..) => {
763 return None;
764 }
765 CreatingStreamingJobStatus::PlaceHolder => {
766 unreachable!()
767 }
768 };
769 partial_graph_manager
770 .start_completing(
771 self.partial_graph_id,
772 epoch_end_bound,
773 |non_checkpoint_epoch, _, _| {
774 if let Some(finish_at_epoch) = finished_at_epoch {
775 assert!(non_checkpoint_epoch.prev < finish_at_epoch);
776 }
777 },
778 )
779 .map(|(epoch, resps, info)| {
780 let is_finish_epoch = if let Some(finish_at_epoch) = finished_at_epoch {
781 assert!(!info.post_collect_command.should_checkpoint());
782 if epoch == finish_at_epoch {
783 self.ack_completed(partial_graph_manager, epoch);
785 true
786 } else {
787 false
788 }
789 } else {
790 false
791 };
792 (epoch, resps, info, is_finish_epoch)
793 })
794 }
795
796 pub(super) fn ack_completed(
797 &mut self,
798 partial_graph_manager: &mut PartialGraphManager,
799 completed_epoch: u64,
800 ) {
801 match &self.status {
802 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
803 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
804 | CreatingStreamingJobStatus::Finishing(_, _) => {
805 partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
806 if let Some(prev_max_committed_epoch) =
807 self.max_committed_epoch.replace(completed_epoch)
808 {
809 assert!(completed_epoch > prev_max_committed_epoch);
810 }
811 }
812 CreatingStreamingJobStatus::Resetting(_) => {
813 }
816 CreatingStreamingJobStatus::PlaceHolder => {
817 unreachable!()
818 }
819 }
820 }
821
822 pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
823 self.status.fragment_infos()
824 }
825
826 pub fn into_tracking_job(self) -> TrackingJob {
827 match self.status {
828 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
829 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
830 | CreatingStreamingJobStatus::Resetting(..)
831 | CreatingStreamingJobStatus::PlaceHolder => {
832 unreachable!("expect finish")
833 }
834 CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
835 }
836 }
837
838 pub(super) fn on_partial_graph_reset(mut self) {
839 match &mut self.status {
840 CreatingStreamingJobStatus::Resetting(notifiers) => {
841 for notifier in notifiers.drain(..) {
842 notifier.notify_collected();
843 }
844 }
845 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
846 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
847 | CreatingStreamingJobStatus::Finishing(_, _) => {
848 panic!(
849 "should be resetting when receiving reset partial graph resp, but at {:?}",
850 self.status
851 )
852 }
853 CreatingStreamingJobStatus::PlaceHolder => {
854 unreachable!()
855 }
856 }
857 }
858
859 pub(super) fn drop(
863 &mut self,
864 notifiers: &mut Vec<Notifier>,
865 partial_graph_manager: &mut PartialGraphManager,
866 ) -> bool {
867 match &mut self.status {
868 CreatingStreamingJobStatus::Resetting(existing_notifiers) => {
869 for notifier in &mut *notifiers {
870 notifier.notify_started();
871 }
872 existing_notifiers.append(notifiers);
873 true
874 }
875 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
876 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
877 for notifier in &mut *notifiers {
878 notifier.notify_started();
879 }
880 partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
881 self.status = CreatingStreamingJobStatus::Resetting(take(notifiers));
882 true
883 }
884 CreatingStreamingJobStatus::Finishing(_, _) => false,
885 CreatingStreamingJobStatus::PlaceHolder => {
886 unreachable!()
887 }
888 }
889 }
890
891 pub(crate) fn reset(self) -> bool {
892 match self.status {
893 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
894 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
895 | CreatingStreamingJobStatus::Finishing(_, _) => false,
896 CreatingStreamingJobStatus::Resetting(notifiers) => {
897 for notifier in notifiers {
898 notifier.notify_collected();
899 }
900 true
901 }
902 CreatingStreamingJobStatus::PlaceHolder => {
903 unreachable!()
904 }
905 }
906 }
907}