1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet, hash_map};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22use std::time::Duration;
23
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::WorkerId;
29use risingwave_pb::ddl_service::PbBackfillType;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_plan::{AddMutation, StopMutation};
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use status::CreatingStreamingJobStatus;
37use tracing::{debug, info};
38
39use super::super::state::RenderResult;
40use super::IndependentCheckpointJobControl;
41use crate::MetaResult;
42use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
43use crate::barrier::checkpoint::independent_job::creating_job::barrier_control::CreatingStreamingJobBarrierStats;
44use crate::barrier::checkpoint::independent_job::creating_job::status::CreateMviewLogStoreProgressTracker;
45use crate::barrier::command::PostCollectCommand;
46use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
47use crate::barrier::edge_builder::FragmentEdgeBuildResult;
48use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
49use crate::barrier::notifier::Notifier;
50use crate::barrier::partial_graph::{
51 CollectedBarrier, PartialGraphBarrierInfo, PartialGraphManager, PartialGraphRecoverer,
52};
53use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
54use crate::barrier::rpc::{build_locality_fragment_state_table_mapping, to_partial_graph_id};
55use crate::barrier::{
56 BackfillOrderState, BackfillProgress, BarrierKind, Command, FragmentBackfillProgress,
57 TracedEpoch,
58};
59use crate::controller::fragment::InflightFragmentInfo;
60use crate::model::{FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate};
61use crate::rpc::metrics::GLOBAL_META_METRICS;
62use crate::stream::source_manager::SplitAssignment;
63use crate::stream::{ExtendedFragmentBackfillOrder, build_actor_connector_splits};
64
65#[derive(Debug)]
66pub(crate) struct CreatingJobInfo {
67 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
68 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
69 pub downstreams: FragmentDownstreamRelation,
70 pub snapshot_backfill_upstream_tables: HashSet<TableId>,
71 pub stream_actors: HashMap<ActorId, StreamActor>,
72}
73
74#[derive(Debug)]
75pub(crate) struct CreatingStreamingJobControl {
76 job_id: JobId,
77 partial_graph_id: PartialGraphId,
78 snapshot_backfill_upstream_tables: HashSet<TableId>,
79 snapshot_epoch: u64,
80
81 node_actors: HashMap<WorkerId, HashSet<ActorId>>,
82 state_table_ids: HashSet<TableId>,
83
84 max_committed_epoch: Option<u64>,
85 status: CreatingStreamingJobStatus,
86
87 upstream_lag: LabelGuardedIntGauge,
88}
89
90impl CreatingStreamingJobControl {
91 pub(crate) fn new<'a>(
92 entry: hash_map::VacantEntry<'a, JobId, IndependentCheckpointJobControl>,
93 create_info: CreateSnapshotBackfillJobCommandInfo,
94 notifiers: Vec<Notifier>,
95 snapshot_backfill_upstream_tables: HashSet<TableId>,
96 snapshot_epoch: u64,
97 version_stat: &HummockVersionStats,
98 partial_graph_manager: &mut PartialGraphManager,
99 edges: &mut FragmentEdgeBuildResult,
100 split_assignment: &SplitAssignment,
101 actors: &RenderResult,
102 ) -> MetaResult<&'a mut Self> {
103 let info = create_info.info.clone();
104 let job_id = info.stream_job_fragments.stream_job_id();
105 let database_id = info.streaming_job.database_id();
106 debug!(
107 %job_id,
108 definition = info.definition,
109 "new creating job"
110 );
111 let fragment_infos = info
112 .stream_job_fragments
113 .new_fragment_info(
114 &actors.stream_actors,
115 &actors.actor_location,
116 split_assignment,
117 )
118 .collect();
119 let snapshot_backfill_actors =
120 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
121 let backfill_nodes_to_pause =
122 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
123 .into_iter()
124 .collect();
125 let backfill_order_state = BackfillOrderState::new(
126 &info.fragment_backfill_ordering,
127 &fragment_infos,
128 info.locality_fragment_state_table_mapping.clone(),
129 );
130 let create_mview_tracker = CreateMviewProgressTracker::recover(
131 job_id,
132 &fragment_infos,
133 backfill_order_state,
134 version_stat,
135 );
136
137 let actors_to_create = Command::create_streaming_job_actors_to_create(
138 &info,
139 edges,
140 &actors.stream_actors,
141 &actors.actor_location,
142 );
143
144 let mut prev_epoch_fake_physical_time = 0;
145 let mut pending_non_checkpoint_barriers = vec![];
146
147 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
148 &mut prev_epoch_fake_physical_time,
149 &mut pending_non_checkpoint_barriers,
150 PbBarrierKind::Checkpoint,
151 );
152
153 let added_actors: Vec<ActorId> = actors
154 .stream_actors
155 .values()
156 .flatten()
157 .map(|actor| actor.actor_id)
158 .collect();
159 let actor_splits = split_assignment
160 .values()
161 .flat_map(build_actor_connector_splits)
162 .collect();
163
164 assert!(
165 info.cdc_table_snapshot_splits.is_none(),
166 "should not have cdc backfill for snapshot backfill job"
167 );
168
169 let initial_mutation = Mutation::Add(AddMutation {
170 actor_dispatchers: Default::default(),
172 added_actors,
173 actor_splits,
174 pause: false,
176 subscriptions_to_add: Default::default(),
177 backfill_nodes_to_pause,
178 actor_cdc_table_snapshot_splits: None,
179 new_upstream_sinks: Default::default(),
180 });
181
182 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
183 let state_table_ids =
184 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
185
186 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
187
188 let IndependentCheckpointJobControl::CreatingStreamingJob(job) = entry.insert(
189 IndependentCheckpointJobControl::CreatingStreamingJob(Self {
190 partial_graph_id,
191 job_id,
192 snapshot_backfill_upstream_tables,
193 max_committed_epoch: None,
194 snapshot_epoch,
195 status: CreatingStreamingJobStatus::PlaceHolder, upstream_lag: GLOBAL_META_METRICS
197 .snapshot_backfill_lag
198 .with_guarded_label_values(&[&format!("{}", job_id)]),
199 node_actors,
200 state_table_ids,
201 }),
202 );
203
204 let mut graph_adder = partial_graph_manager.add_partial_graph(
205 partial_graph_id,
206 CreatingStreamingJobBarrierStats::new(job_id, snapshot_epoch),
207 );
208
209 if let Err(e) = Self::inject_barrier(
210 partial_graph_id,
211 graph_adder.manager(),
212 &job.node_actors,
213 &job.state_table_ids,
214 false,
215 initial_barrier_info,
216 Some(actors_to_create),
217 Some(initial_mutation),
218 notifiers,
219 Some(create_info),
220 ) {
221 graph_adder.failed();
222 job.status = CreatingStreamingJobStatus::Resetting(vec![]);
223 Err(e)
224 } else {
225 graph_adder.added();
226 assert!(pending_non_checkpoint_barriers.is_empty());
227 let job_info = CreatingJobInfo {
228 fragment_infos,
229 upstream_fragment_downstreams: info.upstream_fragment_downstreams.clone(),
230 downstreams: info.stream_job_fragments.downstreams,
231 snapshot_backfill_upstream_tables: job.snapshot_backfill_upstream_tables.clone(),
232 stream_actors: actors
233 .stream_actors
234 .values()
235 .flatten()
236 .map(|actor| (actor.actor_id, actor.clone()))
237 .collect(),
238 };
239 job.status = CreatingStreamingJobStatus::ConsumingSnapshot {
240 prev_epoch_fake_physical_time,
241 pending_upstream_barriers: vec![],
242 version_stats: version_stat.clone(),
243 create_mview_tracker,
244 snapshot_backfill_actors,
245 snapshot_epoch,
246 info: job_info,
247 pending_non_checkpoint_barriers,
248 };
249 Ok(job)
250 }
251 }
252
253 pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
254 match &self.status {
255 CreatingStreamingJobStatus::ConsumingSnapshot {
256 create_mview_tracker,
257 info,
258 ..
259 } => create_mview_tracker.collect_fragment_progress(&info.fragment_infos, true),
260 CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
261 collect_done_fragments(self.job_id, &info.fragment_infos)
262 }
263 CreatingStreamingJobStatus::Finishing(_, _)
264 | CreatingStreamingJobStatus::Resetting(_)
265 | CreatingStreamingJobStatus::PlaceHolder => vec![],
266 }
267 }
268
269 fn resolve_upstream_log_epochs(
270 snapshot_backfill_upstream_tables: &HashSet<TableId>,
271 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
272 exclusive_start_log_epoch: u64,
273 upstream_barrier_info: &BarrierInfo,
274 ) -> MetaResult<Vec<BarrierInfo>> {
275 let table_id = snapshot_backfill_upstream_tables
276 .iter()
277 .next()
278 .expect("snapshot backfill job should have upstream");
279 let epochs_iter = if let Some(epochs) = upstream_table_log_epochs.get(table_id) {
280 let mut epochs_iter = epochs.iter();
281 loop {
282 let (_, checkpoint_epoch) =
283 epochs_iter.next().expect("not reach committed epoch yet");
284 if *checkpoint_epoch < exclusive_start_log_epoch {
285 continue;
286 }
287 assert_eq!(*checkpoint_epoch, exclusive_start_log_epoch);
288 break;
289 }
290 epochs_iter
291 } else {
292 assert_eq!(
294 upstream_barrier_info.prev_epoch(),
295 exclusive_start_log_epoch
296 );
297 static EMPTY_VEC: Vec<(Vec<u64>, u64)> = Vec::new();
298 EMPTY_VEC.iter()
299 };
300
301 let mut ret = vec![];
302 let mut prev_epoch = exclusive_start_log_epoch;
303 let mut pending_non_checkpoint_barriers = vec![];
304 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
305 for (i, epoch) in non_checkpoint_epochs
306 .iter()
307 .chain([checkpoint_epoch])
308 .enumerate()
309 {
310 assert!(*epoch > prev_epoch);
311 pending_non_checkpoint_barriers.push(prev_epoch);
312 ret.push(BarrierInfo {
313 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
314 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
315 kind: if i == 0 {
316 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
317 } else {
318 BarrierKind::Barrier
319 },
320 });
321 prev_epoch = *epoch;
322 }
323 }
324 ret.push(BarrierInfo {
325 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
326 curr_epoch: TracedEpoch::new(Epoch(upstream_barrier_info.curr_epoch())),
327 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
328 });
329 Ok(ret)
330 }
331
332 fn recover_consuming_snapshot(
333 job_id: JobId,
334 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
335 snapshot_epoch: u64,
336 committed_epoch: u64,
337 upstream_barrier_info: &BarrierInfo,
338 info: CreatingJobInfo,
339 backfill_order_state: BackfillOrderState,
340 version_stat: &HummockVersionStats,
341 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
342 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
343 let mut pending_non_checkpoint_barriers = vec![];
344 let create_mview_tracker = CreateMviewProgressTracker::recover(
345 job_id,
346 &info.fragment_infos,
347 backfill_order_state,
348 version_stat,
349 );
350 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
351 &mut prev_epoch_fake_physical_time,
352 &mut pending_non_checkpoint_barriers,
353 PbBarrierKind::Initial,
354 );
355
356 Ok((
357 CreatingStreamingJobStatus::ConsumingSnapshot {
358 prev_epoch_fake_physical_time,
359 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
360 &info.snapshot_backfill_upstream_tables,
361 upstream_table_log_epochs,
362 snapshot_epoch,
363 upstream_barrier_info,
364 )?,
365 version_stats: version_stat.clone(),
366 create_mview_tracker,
367 snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
368 &info.fragment_infos,
369 )
370 .collect(),
371 info,
372 snapshot_epoch,
373 pending_non_checkpoint_barriers,
374 },
375 barrier_info,
376 ))
377 }
378
379 fn recover_consuming_log_store(
380 job_id: JobId,
381 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
382 committed_epoch: u64,
383 upstream_barrier_info: &BarrierInfo,
384 info: CreatingJobInfo,
385 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
386 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
387 &info.snapshot_backfill_upstream_tables,
388 upstream_table_log_epochs,
389 committed_epoch,
390 upstream_barrier_info,
391 )?;
392 let mut first_barrier = barriers_to_inject.remove(0);
393 assert!(first_barrier.kind.is_checkpoint());
394 first_barrier.kind = BarrierKind::Initial;
395
396 Ok((
397 CreatingStreamingJobStatus::ConsumingLogStore {
398 tracking_job: TrackingJob::recovered(job_id, &info.fragment_infos),
399 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
400 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&info.fragment_infos),
401 barriers_to_inject
402 .last()
403 .map(|info| info.prev_epoch() - committed_epoch)
404 .unwrap_or(0),
405 ),
406 barriers_to_inject: Some(barriers_to_inject),
407 info,
408 },
409 first_barrier,
410 ))
411 }
412
413 #[expect(clippy::too_many_arguments)]
414 pub(crate) fn recover(
415 database_id: DatabaseId,
416 job_id: JobId,
417 snapshot_backfill_upstream_tables: HashSet<TableId>,
418 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
419 snapshot_epoch: u64,
420 committed_epoch: u64,
421 upstream_barrier_info: &BarrierInfo,
422 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
423 backfill_order: ExtendedFragmentBackfillOrder,
424 fragment_relations: &FragmentDownstreamRelation,
425 version_stat: &HummockVersionStats,
426 new_actors: StreamJobActorsToCreate,
427 initial_mutation: Mutation,
428 partial_graph_recoverer: &mut PartialGraphRecoverer<'_>,
429 ) -> MetaResult<Self> {
430 info!(
431 %job_id,
432 "recovered creating snapshot backfill job"
433 );
434
435 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
436 let state_table_ids: HashSet<_> =
437 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
438
439 let mut upstream_fragment_downstreams: FragmentDownstreamRelation = Default::default();
440 for (upstream_fragment_id, downstreams) in fragment_relations {
441 if fragment_infos.contains_key(upstream_fragment_id) {
442 continue;
443 }
444 for downstream in downstreams {
445 if fragment_infos.contains_key(&downstream.downstream_fragment_id) {
446 upstream_fragment_downstreams
447 .entry(*upstream_fragment_id)
448 .or_default()
449 .push(downstream.clone());
450 }
451 }
452 }
453 let downstreams = fragment_infos
454 .keys()
455 .filter_map(|fragment_id| {
456 fragment_relations
457 .get(fragment_id)
458 .map(|relation| (*fragment_id, relation.clone()))
459 })
460 .collect();
461
462 let info = CreatingJobInfo {
463 fragment_infos,
464 upstream_fragment_downstreams,
465 downstreams,
466 snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
467 stream_actors: new_actors
468 .values()
469 .flat_map(|fragments| {
470 fragments.values().flat_map(|(_, actors, _)| {
471 actors
472 .iter()
473 .map(|(actor, _, _)| (actor.actor_id, actor.clone()))
474 })
475 })
476 .collect(),
477 };
478
479 let (status, first_barrier_info) = if committed_epoch < snapshot_epoch {
480 let locality_fragment_state_table_mapping =
481 build_locality_fragment_state_table_mapping(&info.fragment_infos);
482 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
483 &backfill_order,
484 &info.fragment_infos,
485 locality_fragment_state_table_mapping,
486 );
487 Self::recover_consuming_snapshot(
488 job_id,
489 upstream_table_log_epochs,
490 snapshot_epoch,
491 committed_epoch,
492 upstream_barrier_info,
493 info,
494 backfill_order_state,
495 version_stat,
496 )?
497 } else {
498 Self::recover_consuming_log_store(
499 job_id,
500 upstream_table_log_epochs,
501 committed_epoch,
502 upstream_barrier_info,
503 info,
504 )?
505 };
506
507 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
508
509 partial_graph_recoverer.recover_graph(
510 partial_graph_id,
511 initial_mutation,
512 &first_barrier_info,
513 &node_actors,
514 state_table_ids.iter().copied(),
515 new_actors,
516 CreatingStreamingJobBarrierStats::new(job_id, snapshot_epoch),
517 )?;
518
519 Ok(Self {
520 job_id,
521 partial_graph_id,
522 snapshot_backfill_upstream_tables,
523 snapshot_epoch,
524 node_actors,
525 state_table_ids,
526 max_committed_epoch: Some(committed_epoch),
527 status,
528 upstream_lag: GLOBAL_META_METRICS
529 .snapshot_backfill_lag
530 .with_guarded_label_values(&[&format!("{}", job_id)]),
531 })
532 }
533
534 pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
535 let progress = match &self.status {
536 CreatingStreamingJobStatus::ConsumingSnapshot {
537 create_mview_tracker,
538 ..
539 } => {
540 if create_mview_tracker.is_finished() {
541 "Snapshot finished".to_owned()
542 } else {
543 let progress = create_mview_tracker.gen_backfill_progress();
544 format!("Snapshot [{}]", progress)
545 }
546 }
547 CreatingStreamingJobStatus::ConsumingLogStore {
548 log_store_progress_tracker,
549 ..
550 } => {
551 format!(
552 "LogStore [{}]",
553 log_store_progress_tracker.gen_backfill_progress()
554 )
555 }
556 CreatingStreamingJobStatus::Finishing(finish_epoch, ..) => {
557 let committed_epoch = self.max_committed_epoch.expect("should have committed");
558 let lag = Duration::from_millis(
559 Epoch(*finish_epoch).physical_time() - Epoch(committed_epoch).physical_time(),
560 );
561 format!("Finishing [epoch lag: {lag:?}]",)
562 }
563 CreatingStreamingJobStatus::Resetting(_) => "Resetting".to_owned(),
564 CreatingStreamingJobStatus::PlaceHolder => {
565 unreachable!()
566 }
567 };
568 BackfillProgress {
569 progress,
570 backfill_type: PbBackfillType::SnapshotBackfill,
571 }
572 }
573
574 pub(super) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
575 (
576 max(self.max_committed_epoch.unwrap_or(0), self.snapshot_epoch),
577 self.snapshot_backfill_upstream_tables.clone(),
578 )
579 }
580
581 fn inject_barrier(
582 partial_graph_id: PartialGraphId,
583 partial_graph_manager: &mut PartialGraphManager,
584 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
585 state_table_ids: &HashSet<TableId>,
586 is_finishing: bool,
587 barrier_info: BarrierInfo,
588 new_actors: Option<StreamJobActorsToCreate>,
589 mutation: Option<Mutation>,
590 notifiers: Vec<Notifier>,
591 first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
592 ) -> MetaResult<()> {
593 let (table_ids_to_sync, nodes_to_sync_table) = if !is_finishing {
594 (Some(state_table_ids), Some(node_actors.keys().copied()))
595 } else {
596 (None, None)
597 };
598 partial_graph_manager.inject_barrier(
599 partial_graph_id,
600 mutation,
601 node_actors,
602 table_ids_to_sync.into_iter().flatten().copied(),
603 nodes_to_sync_table.into_iter().flatten(),
604 new_actors,
605 PartialGraphBarrierInfo::new(
606 first_create_info.map_or_else(
607 PostCollectCommand::barrier,
608 CreateSnapshotBackfillJobCommandInfo::into_post_collect,
609 ),
610 barrier_info,
611 notifiers,
612 state_table_ids.clone(),
613 ),
614 )?;
615 Ok(())
616 }
617
618 pub(crate) fn start_consume_upstream(
619 &mut self,
620 partial_graph_manager: &mut PartialGraphManager,
621 barrier_info: &BarrierInfo,
622 ) -> MetaResult<CreatingJobInfo> {
623 info!(
624 job_id = %self.job_id,
625 prev_epoch = barrier_info.prev_epoch(),
626 "start consuming upstream"
627 );
628 let info = self.status.start_consume_upstream(barrier_info);
629 Self::inject_barrier(
630 self.partial_graph_id,
631 partial_graph_manager,
632 &self.node_actors,
633 &self.state_table_ids,
634 true,
635 barrier_info.clone(),
636 None,
637 Some(Mutation::Stop(StopMutation {
638 actors: info
640 .fragment_infos
641 .values()
642 .flat_map(|info| info.actors.keys().copied())
643 .collect(),
644 dropped_sink_fragments: vec![], })),
646 vec![], None,
648 )?;
649 Ok(info)
650 }
651
652 pub(crate) fn on_new_upstream_barrier(
653 &mut self,
654 partial_graph_manager: &mut PartialGraphManager,
655 barrier_info: &BarrierInfo,
656 mutation: Option<(Mutation, Vec<Notifier>)>,
657 ) -> MetaResult<()> {
658 let progress_epoch = if let Some(max_committed_epoch) = self.max_committed_epoch {
659 max(max_committed_epoch, self.snapshot_epoch)
660 } else {
661 self.snapshot_epoch
662 };
663 self.upstream_lag.set(
664 barrier_info
665 .prev_epoch
666 .value()
667 .0
668 .saturating_sub(progress_epoch) as _,
669 );
670 let (mut mutation, mut notifiers) = match mutation {
671 Some((mutation, notifiers)) => (Some(mutation), notifiers),
672 None => (None, vec![]),
673 };
674 {
675 for (barrier_to_inject, mutation) in self
676 .status
677 .on_new_upstream_epoch(barrier_info, mutation.take())
678 {
679 Self::inject_barrier(
680 self.partial_graph_id,
681 partial_graph_manager,
682 &self.node_actors,
683 &self.state_table_ids,
684 false,
685 barrier_to_inject,
686 None,
687 mutation,
688 take(&mut notifiers),
689 None,
690 )?;
691 }
692 assert!(mutation.is_none(), "must have consumed mutation");
693 assert!(notifiers.is_empty(), "must consumed notifiers");
694 }
695 Ok(())
696 }
697
698 pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
699 self.status.update_progress(
700 collected_barrier
701 .resps
702 .values()
703 .flat_map(|resp| &resp.create_mview_progress),
704 );
705 self.should_merge_to_upstream()
706 }
707
708 pub(crate) fn should_merge_to_upstream(&self) -> bool {
709 if let CreatingStreamingJobStatus::ConsumingLogStore {
710 log_store_progress_tracker,
711 barriers_to_inject,
712 ..
713 } = &self.status
714 && barriers_to_inject.is_none()
715 && log_store_progress_tracker.is_finished()
716 {
717 true
718 } else {
719 false
720 }
721 }
722}
723
724impl CreatingStreamingJobControl {
725 pub(crate) fn start_completing(
726 &mut self,
727 partial_graph_manager: &mut PartialGraphManager,
728 min_upstream_inflight_epoch: Option<u64>,
729 upstream_committed_epoch: u64,
730 ) -> Option<(
731 u64,
732 HashMap<WorkerId, BarrierCompleteResponse>,
733 PartialGraphBarrierInfo,
734 bool,
735 )> {
736 if upstream_committed_epoch < self.snapshot_epoch {
738 return None;
739 }
740 let (finished_at_epoch, epoch_end_bound) = match &self.status {
741 CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
742 let epoch_end_bound = min_upstream_inflight_epoch
743 .map(|upstream_epoch| {
744 if upstream_epoch < *finish_at_epoch {
745 Excluded(upstream_epoch)
746 } else {
747 Unbounded
748 }
749 })
750 .unwrap_or(Unbounded);
751 (Some(*finish_at_epoch), epoch_end_bound)
752 }
753 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
754 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
755 None,
756 min_upstream_inflight_epoch
757 .map(Excluded)
758 .unwrap_or(Unbounded),
759 ),
760 CreatingStreamingJobStatus::Resetting(..) => {
761 return None;
762 }
763 CreatingStreamingJobStatus::PlaceHolder => {
764 unreachable!()
765 }
766 };
767 partial_graph_manager
768 .start_completing(
769 self.partial_graph_id,
770 epoch_end_bound,
771 |non_checkpoint_epoch, _, _| {
772 if let Some(finish_at_epoch) = finished_at_epoch {
773 assert!(non_checkpoint_epoch.prev < finish_at_epoch);
774 }
775 },
776 )
777 .map(|(epoch, resps, info)| {
778 let is_finish_epoch = if let Some(finish_at_epoch) = finished_at_epoch {
779 assert!(!info.post_collect_command.should_checkpoint());
780 if epoch == finish_at_epoch {
781 self.ack_completed(partial_graph_manager, epoch);
783 true
784 } else {
785 false
786 }
787 } else {
788 false
789 };
790 (epoch, resps, info, is_finish_epoch)
791 })
792 }
793
794 pub(super) fn ack_completed(
795 &mut self,
796 partial_graph_manager: &mut PartialGraphManager,
797 completed_epoch: u64,
798 ) {
799 match &self.status {
800 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
801 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
802 | CreatingStreamingJobStatus::Finishing(_, _) => {
803 partial_graph_manager.ack_completed(self.partial_graph_id, completed_epoch);
804 if let Some(prev_max_committed_epoch) =
805 self.max_committed_epoch.replace(completed_epoch)
806 {
807 assert!(completed_epoch > prev_max_committed_epoch);
808 }
809 }
810 CreatingStreamingJobStatus::Resetting(_) => {
811 }
814 CreatingStreamingJobStatus::PlaceHolder => {
815 unreachable!()
816 }
817 }
818 }
819
820 pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
821 self.status.fragment_infos()
822 }
823
824 pub fn into_tracking_job(self) -> TrackingJob {
825 match self.status {
826 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
827 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
828 | CreatingStreamingJobStatus::Resetting(..)
829 | CreatingStreamingJobStatus::PlaceHolder => {
830 unreachable!("expect finish")
831 }
832 CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
833 }
834 }
835
836 pub(super) fn on_partial_graph_reset(mut self) {
837 match &mut self.status {
838 CreatingStreamingJobStatus::Resetting(notifiers) => {
839 for notifier in notifiers.drain(..) {
840 notifier.notify_collected();
841 }
842 }
843 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
844 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
845 | CreatingStreamingJobStatus::Finishing(_, _) => {
846 panic!(
847 "should be resetting when receiving reset partial graph resp, but at {:?}",
848 self.status
849 )
850 }
851 CreatingStreamingJobStatus::PlaceHolder => {
852 unreachable!()
853 }
854 }
855 }
856
857 pub(super) fn drop(
861 &mut self,
862 notifiers: &mut Vec<Notifier>,
863 partial_graph_manager: &mut PartialGraphManager,
864 ) -> bool {
865 match &mut self.status {
866 CreatingStreamingJobStatus::Resetting(existing_notifiers) => {
867 for notifier in &mut *notifiers {
868 notifier.notify_started();
869 }
870 existing_notifiers.append(notifiers);
871 true
872 }
873 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
874 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
875 for notifier in &mut *notifiers {
876 notifier.notify_started();
877 }
878 partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
879 self.status = CreatingStreamingJobStatus::Resetting(take(notifiers));
880 true
881 }
882 CreatingStreamingJobStatus::Finishing(_, _) => false,
883 CreatingStreamingJobStatus::PlaceHolder => {
884 unreachable!()
885 }
886 }
887 }
888
889 pub(crate) fn reset(self) -> bool {
890 match self.status {
891 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
892 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
893 | CreatingStreamingJobStatus::Finishing(_, _) => false,
894 CreatingStreamingJobStatus::Resetting(notifiers) => {
895 for notifier in notifiers {
896 notifier.notify_collected();
897 }
898 true
899 }
900 CreatingStreamingJobStatus::PlaceHolder => {
901 unreachable!()
902 }
903 }
904 }
905}