1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet, hash_map};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_meta_model::{DispatcherType, WorkerId};
30use risingwave_pb::ddl_service::PbBackfillType;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
33use risingwave_pb::stream_plan::barrier::PbBarrierKind;
34use risingwave_pb::stream_plan::barrier_mutation::Mutation;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_plan::{AddMutation, StopMutation};
37use risingwave_pb::stream_service::BarrierCompleteResponse;
38use status::CreatingStreamingJobStatus;
39use tracing::{debug, info};
40
41use crate::MetaResult;
42use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
43use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
44use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
45use crate::barrier::edge_builder::FragmentEdgeBuildResult;
46use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
47use crate::barrier::notifier::Notifier;
48use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphRecoverer};
49use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
50use crate::barrier::rpc::{build_locality_fragment_state_table_mapping, to_partial_graph_id};
51use crate::barrier::{
52 BackfillOrderState, BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch,
53};
54use crate::controller::fragment::InflightFragmentInfo;
55use crate::model::{FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::source_manager::SplitAssignment;
58use crate::stream::{ExtendedFragmentBackfillOrder, build_actor_connector_splits};
59
60#[derive(Debug)]
61pub(crate) struct CreatingJobInfo {
62 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
63 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
64 pub downstreams: FragmentDownstreamRelation,
65 pub snapshot_backfill_upstream_tables: HashSet<TableId>,
66 pub stream_actors: HashMap<ActorId, StreamActor>,
67}
68
69#[derive(Debug)]
70pub(crate) struct CreatingStreamingJobControl {
71 job_id: JobId,
72 partial_graph_id: PartialGraphId,
73 snapshot_backfill_upstream_tables: HashSet<TableId>,
74 snapshot_epoch: u64,
75
76 node_actors: HashMap<WorkerId, HashSet<ActorId>>,
77 state_table_ids: HashSet<TableId>,
78
79 barrier_control: CreatingStreamingJobBarrierControl,
80 status: CreatingStreamingJobStatus,
81
82 upstream_lag: LabelGuardedIntGauge,
83}
84
85fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
86 let mut has_unreschedulable_scan = false;
87 visit_stream_node_cont(&fragment.nodes, |node| {
88 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
89 let scan_type = stream_scan.stream_scan_type();
90 if !scan_type.is_reschedulable(true) {
91 has_unreschedulable_scan = true;
92 return false;
93 }
94 }
95 true
96 });
97 has_unreschedulable_scan
98}
99
100fn collect_fragment_upstream_fragment_ids(
101 fragment: &InflightFragmentInfo,
102 upstream_fragment_ids: &mut HashSet<FragmentId>,
103) {
104 visit_stream_node_cont(&fragment.nodes, |node| {
105 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
106 upstream_fragment_ids.insert(merge.upstream_fragment_id);
107 }
108 true
109 });
110}
111
112impl CreatingStreamingJobControl {
113 pub(super) fn new<'a>(
114 entry: hash_map::VacantEntry<'a, JobId, Self>,
115 create_info: CreateSnapshotBackfillJobCommandInfo,
116 notifiers: Vec<Notifier>,
117 snapshot_backfill_upstream_tables: HashSet<TableId>,
118 snapshot_epoch: u64,
119 version_stat: &HummockVersionStats,
120 partial_graph_manager: &mut PartialGraphManager,
121 edges: &mut FragmentEdgeBuildResult,
122 split_assignment: &SplitAssignment,
123 ) -> MetaResult<&'a mut Self> {
124 let info = create_info.info.clone();
125 let job_id = info.stream_job_fragments.stream_job_id();
126 let database_id = info.streaming_job.database_id();
127 debug!(
128 %job_id,
129 definition = info.definition,
130 "new creating job"
131 );
132 let fragment_infos = info
133 .stream_job_fragments
134 .new_fragment_info(split_assignment)
135 .collect();
136 let snapshot_backfill_actors =
137 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
138 let backfill_nodes_to_pause =
139 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
140 .into_iter()
141 .collect();
142 let backfill_order_state = BackfillOrderState::new(
143 &info.fragment_backfill_ordering,
144 &info.stream_job_fragments,
145 info.locality_fragment_state_table_mapping.clone(),
146 );
147 let create_mview_tracker = CreateMviewProgressTracker::recover(
148 job_id,
149 &fragment_infos,
150 backfill_order_state,
151 version_stat,
152 );
153
154 let actors_to_create =
155 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create().map(
156 |(fragment_id, node, actors)| {
157 (
158 fragment_id,
159 node,
160 actors,
161 [], )
163 },
164 ));
165
166 let barrier_control = CreatingStreamingJobBarrierControl::new(job_id, snapshot_epoch, None);
167
168 let mut prev_epoch_fake_physical_time = 0;
169 let mut pending_non_checkpoint_barriers = vec![];
170
171 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
172 &mut prev_epoch_fake_physical_time,
173 &mut pending_non_checkpoint_barriers,
174 PbBarrierKind::Checkpoint,
175 );
176
177 let added_actors = info.stream_job_fragments.actor_ids().collect();
178 let actor_splits = split_assignment
179 .values()
180 .flat_map(build_actor_connector_splits)
181 .collect();
182
183 assert!(
184 info.cdc_table_snapshot_splits.is_none(),
185 "should not have cdc backfill for snapshot backfill job"
186 );
187
188 let initial_mutation = Mutation::Add(AddMutation {
189 actor_dispatchers: Default::default(),
191 added_actors,
192 actor_splits,
193 pause: false,
195 subscriptions_to_add: Default::default(),
196 backfill_nodes_to_pause,
197 actor_cdc_table_snapshot_splits: None,
198 new_upstream_sinks: Default::default(),
199 });
200
201 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
202 let state_table_ids =
203 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
204
205 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
206
207 let job = entry.insert(Self {
208 partial_graph_id,
209 job_id,
210 snapshot_backfill_upstream_tables,
211 barrier_control,
212 snapshot_epoch,
213 status: CreatingStreamingJobStatus::PlaceHolder, upstream_lag: GLOBAL_META_METRICS
215 .snapshot_backfill_lag
216 .with_guarded_label_values(&[&format!("{}", job_id)]),
217 node_actors,
218 state_table_ids,
219 });
220
221 let mut graph_adder = partial_graph_manager.add_partial_graph(partial_graph_id);
222
223 if let Err(e) = Self::inject_barrier(
224 partial_graph_id,
225 graph_adder.manager(),
226 &mut job.barrier_control,
227 &job.node_actors,
228 Some(&job.state_table_ids),
229 initial_barrier_info,
230 Some(actors_to_create),
231 Some(initial_mutation),
232 notifiers,
233 Some(create_info),
234 ) {
235 graph_adder.failed();
236 job.status = CreatingStreamingJobStatus::Resetting(vec![]);
237 Err(e)
238 } else {
239 graph_adder.added();
240 assert!(pending_non_checkpoint_barriers.is_empty());
241 let job_info = CreatingJobInfo {
242 fragment_infos,
243 upstream_fragment_downstreams: info.upstream_fragment_downstreams.clone(),
244 downstreams: info.stream_job_fragments.downstreams.clone(),
245 snapshot_backfill_upstream_tables: job.snapshot_backfill_upstream_tables.clone(),
246 stream_actors: info
247 .stream_job_fragments
248 .fragments
249 .values()
250 .flat_map(|fragment| {
251 fragment
252 .actors
253 .iter()
254 .map(|actor| (actor.actor_id, actor.clone()))
255 })
256 .collect(),
257 };
258 job.status = CreatingStreamingJobStatus::ConsumingSnapshot {
259 prev_epoch_fake_physical_time,
260 pending_upstream_barriers: vec![],
261 version_stats: version_stat.clone(),
262 create_mview_tracker,
263 snapshot_backfill_actors,
264 snapshot_epoch,
265 info: job_info,
266 pending_non_checkpoint_barriers,
267 };
268 Ok(job)
269 }
270 }
271
272 pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
273 match &self.status {
274 CreatingStreamingJobStatus::ConsumingSnapshot {
275 create_mview_tracker,
276 info,
277 ..
278 } => create_mview_tracker.collect_fragment_progress(&info.fragment_infos, true),
279 CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
280 collect_done_fragments(self.job_id, &info.fragment_infos)
281 }
282 CreatingStreamingJobStatus::Finishing(_, _)
283 | CreatingStreamingJobStatus::Resetting(_)
284 | CreatingStreamingJobStatus::PlaceHolder => vec![],
285 }
286 }
287
288 fn resolve_upstream_log_epochs(
289 snapshot_backfill_upstream_tables: &HashSet<TableId>,
290 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
291 exclusive_start_log_epoch: u64,
292 upstream_barrier_info: &BarrierInfo,
293 ) -> MetaResult<Vec<BarrierInfo>> {
294 let table_id = snapshot_backfill_upstream_tables
295 .iter()
296 .next()
297 .expect("snapshot backfill job should have upstream");
298 let epochs_iter = if let Some(epochs) = upstream_table_log_epochs.get(table_id) {
299 let mut epochs_iter = epochs.iter();
300 loop {
301 let (_, checkpoint_epoch) =
302 epochs_iter.next().expect("not reach committed epoch yet");
303 if *checkpoint_epoch < exclusive_start_log_epoch {
304 continue;
305 }
306 assert_eq!(*checkpoint_epoch, exclusive_start_log_epoch);
307 break;
308 }
309 epochs_iter
310 } else {
311 assert_eq!(
313 upstream_barrier_info.prev_epoch(),
314 exclusive_start_log_epoch
315 );
316 static EMPTY_VEC: Vec<(Vec<u64>, u64)> = Vec::new();
317 EMPTY_VEC.iter()
318 };
319
320 let mut ret = vec![];
321 let mut prev_epoch = exclusive_start_log_epoch;
322 let mut pending_non_checkpoint_barriers = vec![];
323 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
324 for (i, epoch) in non_checkpoint_epochs
325 .iter()
326 .chain([checkpoint_epoch])
327 .enumerate()
328 {
329 assert!(*epoch > prev_epoch);
330 pending_non_checkpoint_barriers.push(prev_epoch);
331 ret.push(BarrierInfo {
332 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
333 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
334 kind: if i == 0 {
335 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
336 } else {
337 BarrierKind::Barrier
338 },
339 });
340 prev_epoch = *epoch;
341 }
342 }
343 ret.push(BarrierInfo {
344 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
345 curr_epoch: TracedEpoch::new(Epoch(upstream_barrier_info.curr_epoch())),
346 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
347 });
348 Ok(ret)
349 }
350
351 fn recover_consuming_snapshot(
352 job_id: JobId,
353 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
354 snapshot_epoch: u64,
355 committed_epoch: u64,
356 upstream_barrier_info: &BarrierInfo,
357 info: CreatingJobInfo,
358 backfill_order_state: BackfillOrderState,
359 version_stat: &HummockVersionStats,
360 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
361 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
362 let mut pending_non_checkpoint_barriers = vec![];
363 let create_mview_tracker = CreateMviewProgressTracker::recover(
364 job_id,
365 &info.fragment_infos,
366 backfill_order_state,
367 version_stat,
368 );
369 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
370 &mut prev_epoch_fake_physical_time,
371 &mut pending_non_checkpoint_barriers,
372 PbBarrierKind::Initial,
373 );
374
375 Ok((
376 CreatingStreamingJobStatus::ConsumingSnapshot {
377 prev_epoch_fake_physical_time,
378 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
379 &info.snapshot_backfill_upstream_tables,
380 upstream_table_log_epochs,
381 snapshot_epoch,
382 upstream_barrier_info,
383 )?,
384 version_stats: version_stat.clone(),
385 create_mview_tracker,
386 snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
387 &info.fragment_infos,
388 )
389 .collect(),
390 info,
391 snapshot_epoch,
392 pending_non_checkpoint_barriers,
393 },
394 barrier_info,
395 ))
396 }
397
398 fn recover_consuming_log_store(
399 job_id: JobId,
400 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
401 committed_epoch: u64,
402 upstream_barrier_info: &BarrierInfo,
403 info: CreatingJobInfo,
404 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
405 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
406 &info.snapshot_backfill_upstream_tables,
407 upstream_table_log_epochs,
408 committed_epoch,
409 upstream_barrier_info,
410 )?;
411 let mut first_barrier = barriers_to_inject.remove(0);
412 assert!(first_barrier.kind.is_checkpoint());
413 first_barrier.kind = BarrierKind::Initial;
414
415 Ok((
416 CreatingStreamingJobStatus::ConsumingLogStore {
417 tracking_job: TrackingJob::recovered(job_id, &info.fragment_infos),
418 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
419 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&info.fragment_infos),
420 barriers_to_inject
421 .last()
422 .map(|info| info.prev_epoch() - committed_epoch)
423 .unwrap_or(0),
424 ),
425 barriers_to_inject: Some(barriers_to_inject),
426 info,
427 },
428 first_barrier,
429 ))
430 }
431
432 #[expect(clippy::too_many_arguments)]
433 pub(crate) fn recover(
434 database_id: DatabaseId,
435 job_id: JobId,
436 snapshot_backfill_upstream_tables: HashSet<TableId>,
437 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
438 snapshot_epoch: u64,
439 committed_epoch: u64,
440 upstream_barrier_info: &BarrierInfo,
441 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
442 backfill_order: ExtendedFragmentBackfillOrder,
443 fragment_relations: &FragmentDownstreamRelation,
444 version_stat: &HummockVersionStats,
445 new_actors: StreamJobActorsToCreate,
446 initial_mutation: Mutation,
447 partial_graph_recoverer: &mut PartialGraphRecoverer<'_>,
448 ) -> MetaResult<Self> {
449 info!(
450 %job_id,
451 "recovered creating snapshot backfill job"
452 );
453 let barrier_control =
454 CreatingStreamingJobBarrierControl::new(job_id, snapshot_epoch, Some(committed_epoch));
455
456 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
457 let state_table_ids: HashSet<_> =
458 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
459
460 let mut upstream_fragment_downstreams: FragmentDownstreamRelation = Default::default();
461 for (upstream_fragment_id, downstreams) in fragment_relations {
462 if fragment_infos.contains_key(upstream_fragment_id) {
463 continue;
464 }
465 for downstream in downstreams {
466 if fragment_infos.contains_key(&downstream.downstream_fragment_id) {
467 upstream_fragment_downstreams
468 .entry(*upstream_fragment_id)
469 .or_default()
470 .push(downstream.clone());
471 }
472 }
473 }
474 let downstreams = fragment_infos
475 .keys()
476 .filter_map(|fragment_id| {
477 fragment_relations
478 .get(fragment_id)
479 .map(|relation| (*fragment_id, relation.clone()))
480 })
481 .collect();
482
483 let info = CreatingJobInfo {
484 fragment_infos,
485 upstream_fragment_downstreams,
486 downstreams,
487 snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
488 stream_actors: new_actors
489 .values()
490 .flat_map(|fragments| {
491 fragments.values().flat_map(|(_, actors, _)| {
492 actors
493 .iter()
494 .map(|(actor, _, _)| (actor.actor_id, actor.clone()))
495 })
496 })
497 .collect(),
498 };
499
500 let (status, first_barrier_info) = if committed_epoch < snapshot_epoch {
501 let locality_fragment_state_table_mapping =
502 build_locality_fragment_state_table_mapping(&info.fragment_infos);
503 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
504 &backfill_order,
505 &info.fragment_infos,
506 locality_fragment_state_table_mapping,
507 );
508 Self::recover_consuming_snapshot(
509 job_id,
510 upstream_table_log_epochs,
511 snapshot_epoch,
512 committed_epoch,
513 upstream_barrier_info,
514 info,
515 backfill_order_state,
516 version_stat,
517 )?
518 } else {
519 Self::recover_consuming_log_store(
520 job_id,
521 upstream_table_log_epochs,
522 committed_epoch,
523 upstream_barrier_info,
524 info,
525 )?
526 };
527
528 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
529
530 partial_graph_recoverer.recover_graph(
531 partial_graph_id,
532 initial_mutation,
533 &first_barrier_info,
534 &node_actors,
535 state_table_ids.iter().copied(),
536 new_actors,
537 )?;
538
539 Ok(Self {
540 job_id,
541 partial_graph_id,
542 snapshot_backfill_upstream_tables,
543 snapshot_epoch,
544 node_actors,
545 state_table_ids,
546 barrier_control,
547 status,
548 upstream_lag: GLOBAL_META_METRICS
549 .snapshot_backfill_lag
550 .with_guarded_label_values(&[&format!("{}", job_id)]),
551 })
552 }
553
554 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
555 self.status
556 .fragment_infos()
557 .map(|fragment_infos| {
558 !InflightFragmentInfo::contains_worker(fragment_infos.values(), worker_id)
559 })
560 .unwrap_or(true)
561 }
562
563 pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
564 let progress = match &self.status {
565 CreatingStreamingJobStatus::ConsumingSnapshot {
566 create_mview_tracker,
567 ..
568 } => {
569 if create_mview_tracker.is_finished() {
570 "Snapshot finished".to_owned()
571 } else {
572 let progress = create_mview_tracker.gen_backfill_progress();
573 format!("Snapshot [{}]", progress)
574 }
575 }
576 CreatingStreamingJobStatus::ConsumingLogStore {
577 log_store_progress_tracker,
578 ..
579 } => {
580 format!(
581 "LogStore [{}]",
582 log_store_progress_tracker.gen_backfill_progress()
583 )
584 }
585 CreatingStreamingJobStatus::Finishing(..) => {
586 format!(
587 "Finishing [epoch count: {}]",
588 self.barrier_control.inflight_barrier_count()
589 )
590 }
591 CreatingStreamingJobStatus::Resetting(_) => "Resetting".to_owned(),
592 CreatingStreamingJobStatus::PlaceHolder => {
593 unreachable!()
594 }
595 };
596 BackfillProgress {
597 progress,
598 backfill_type: PbBackfillType::SnapshotBackfill,
599 }
600 }
601
602 pub(super) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
603 (
604 max(
605 self.barrier_control.max_committed_epoch().unwrap_or(0),
606 self.snapshot_epoch,
607 ),
608 self.snapshot_backfill_upstream_tables.clone(),
609 )
610 }
611
612 fn inject_barrier(
613 partial_graph_id: PartialGraphId,
614 partial_graph_manager: &mut PartialGraphManager,
615 barrier_control: &mut CreatingStreamingJobBarrierControl,
616 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
617 state_table_ids: Option<&HashSet<TableId>>,
618 barrier_info: BarrierInfo,
619 new_actors: Option<StreamJobActorsToCreate>,
620 mutation: Option<Mutation>,
621 mut notifiers: Vec<Notifier>,
622 first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
623 ) -> MetaResult<()> {
624 let (state_table_ids, nodes_to_sync_table) = if let Some(state_table_ids) = state_table_ids
625 {
626 (Some(state_table_ids), Some(node_actors.keys().copied()))
627 } else {
628 (None, None)
629 };
630 partial_graph_manager.inject_barrier(
631 partial_graph_id,
632 mutation,
633 &barrier_info,
634 node_actors,
635 state_table_ids.into_iter().flatten().copied(),
636 nodes_to_sync_table.into_iter().flatten(),
637 new_actors,
638 )?;
639 notifiers.iter_mut().for_each(|n| n.notify_started());
640 barrier_control.enqueue_epoch(
641 barrier_info.prev_epoch(),
642 barrier_info.kind.clone(),
643 notifiers,
644 first_create_info,
645 );
646 Ok(())
647 }
648
649 pub(super) fn start_consume_upstream(
650 &mut self,
651 partial_graph_manager: &mut PartialGraphManager,
652 barrier_info: &BarrierInfo,
653 ) -> MetaResult<CreatingJobInfo> {
654 info!(
655 job_id = %self.job_id,
656 prev_epoch = barrier_info.prev_epoch(),
657 "start consuming upstream"
658 );
659 let info = self.status.start_consume_upstream(barrier_info);
660 Self::inject_barrier(
661 self.partial_graph_id,
662 partial_graph_manager,
663 &mut self.barrier_control,
664 &self.node_actors,
665 None,
666 barrier_info.clone(),
667 None,
668 Some(Mutation::Stop(StopMutation {
669 actors: info
671 .fragment_infos
672 .values()
673 .flat_map(|info| info.actors.keys().copied())
674 .collect(),
675 dropped_sink_fragments: vec![], })),
677 vec![], None,
679 )?;
680 Ok(info)
681 }
682
683 pub(super) fn on_new_upstream_barrier(
684 &mut self,
685 partial_graph_manager: &mut PartialGraphManager,
686 barrier_info: &BarrierInfo,
687 mutation: Option<(Mutation, Vec<Notifier>)>,
688 ) -> MetaResult<()> {
689 let progress_epoch =
690 if let Some(max_committed_epoch) = self.barrier_control.max_committed_epoch() {
691 max(max_committed_epoch, self.snapshot_epoch)
692 } else {
693 self.snapshot_epoch
694 };
695 self.upstream_lag.set(
696 barrier_info
697 .prev_epoch
698 .value()
699 .0
700 .saturating_sub(progress_epoch) as _,
701 );
702 let (mut mutation, mut notifiers) = match mutation {
703 Some((mutation, notifiers)) => (Some(mutation), notifiers),
704 None => (None, vec![]),
705 };
706 {
707 for (barrier_to_inject, mutation) in self
708 .status
709 .on_new_upstream_epoch(barrier_info, mutation.take())
710 {
711 Self::inject_barrier(
712 self.partial_graph_id,
713 partial_graph_manager,
714 &mut self.barrier_control,
715 &self.node_actors,
716 Some(&self.state_table_ids),
717 barrier_to_inject,
718 None,
719 mutation,
720 take(&mut notifiers),
721 None,
722 )?;
723 }
724 assert!(mutation.is_none(), "must have consumed mutation");
725 assert!(notifiers.is_empty(), "must consumed notifiers");
726 }
727 Ok(())
728 }
729
730 pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier) -> bool {
731 self.status.update_progress(
732 collected_barrier
733 .resps
734 .values()
735 .flat_map(|resp| &resp.create_mview_progress),
736 );
737 self.barrier_control.collect(collected_barrier);
738 self.should_merge_to_upstream()
739 }
740
741 pub(super) fn should_merge_to_upstream(&self) -> bool {
742 if let CreatingStreamingJobStatus::ConsumingLogStore {
743 log_store_progress_tracker,
744 barriers_to_inject,
745 ..
746 } = &self.status
747 && barriers_to_inject.is_none()
748 && log_store_progress_tracker.is_finished()
749 {
750 true
751 } else {
752 false
753 }
754 }
755}
756
757pub(super) enum CompleteJobType {
758 First(CreateSnapshotBackfillJobCommandInfo),
760 Normal,
761 Finished,
763}
764
765impl CreatingStreamingJobControl {
766 pub(super) fn start_completing(
767 &mut self,
768 min_upstream_inflight_epoch: Option<u64>,
769 upstream_committed_epoch: u64,
770 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
771 if upstream_committed_epoch < self.snapshot_epoch {
773 return None;
774 }
775 let (finished_at_epoch, epoch_end_bound) = match &self.status {
776 CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
777 let epoch_end_bound = min_upstream_inflight_epoch
778 .map(|upstream_epoch| {
779 if upstream_epoch < *finish_at_epoch {
780 Excluded(upstream_epoch)
781 } else {
782 Unbounded
783 }
784 })
785 .unwrap_or(Unbounded);
786 (Some(*finish_at_epoch), epoch_end_bound)
787 }
788 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
789 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
790 None,
791 min_upstream_inflight_epoch
792 .map(Excluded)
793 .unwrap_or(Unbounded),
794 ),
795 CreatingStreamingJobStatus::Resetting(..) => {
796 return None;
797 }
798 CreatingStreamingJobStatus::PlaceHolder => {
799 unreachable!()
800 }
801 };
802 self.barrier_control.start_completing(epoch_end_bound).map(
803 |(epoch, resps, create_job_info)| {
804 let status = if let Some(finish_at_epoch) = finished_at_epoch {
805 assert!(create_job_info.is_none());
806 if epoch == finish_at_epoch {
807 self.barrier_control.ack_completed(epoch);
808 assert!(self.barrier_control.is_empty());
809 CompleteJobType::Finished
810 } else {
811 CompleteJobType::Normal
812 }
813 } else if let Some(info) = create_job_info {
814 CompleteJobType::First(info)
815 } else {
816 CompleteJobType::Normal
817 };
818 (epoch, resps, status)
819 },
820 )
821 }
822
823 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
824 self.barrier_control.ack_completed(completed_epoch);
825 }
826
827 pub fn state_table_ids(&self) -> &HashSet<TableId> {
828 &self.state_table_ids
829 }
830
831 pub fn fragment_infos_with_job_id(
832 &self,
833 ) -> impl Iterator<Item = (&InflightFragmentInfo, JobId)> + '_ {
834 self.status
835 .fragment_infos()
836 .into_iter()
837 .flat_map(|fragments| fragments.values().map(|fragment| (fragment, self.job_id)))
838 }
839
840 pub(super) fn collect_reschedule_blocked_fragment_ids(
841 &self,
842 blocked_fragment_ids: &mut HashSet<FragmentId>,
843 ) {
844 let Some(info) = self.status.creating_job_info() else {
845 return;
846 };
847
848 for (fragment_id, fragment) in &info.fragment_infos {
849 if fragment_has_online_unreschedulable_scan(fragment) {
850 blocked_fragment_ids.insert(*fragment_id);
851 collect_fragment_upstream_fragment_ids(fragment, blocked_fragment_ids);
852 }
853 }
854 }
855
856 pub(super) fn collect_no_shuffle_fragment_relations(
857 &self,
858 no_shuffle_relations: &mut Vec<(FragmentId, FragmentId)>,
859 ) {
860 let Some(info) = self.status.creating_job_info() else {
861 return;
862 };
863
864 for (upstream_fragment_id, downstreams) in &info.upstream_fragment_downstreams {
865 no_shuffle_relations.extend(
866 downstreams
867 .iter()
868 .filter(|downstream| downstream.dispatcher_type == DispatcherType::NoShuffle)
869 .map(|downstream| (*upstream_fragment_id, downstream.downstream_fragment_id)),
870 );
871 }
872
873 for (fragment_id, downstreams) in &info.downstreams {
874 no_shuffle_relations.extend(
875 downstreams
876 .iter()
877 .filter(|downstream| downstream.dispatcher_type == DispatcherType::NoShuffle)
878 .map(|downstream| (*fragment_id, downstream.downstream_fragment_id)),
879 );
880 }
881 }
882
883 pub fn into_tracking_job(self) -> TrackingJob {
884 assert!(self.barrier_control.is_empty());
885 match self.status {
886 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
887 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
888 | CreatingStreamingJobStatus::Resetting(..)
889 | CreatingStreamingJobStatus::PlaceHolder => {
890 unreachable!("expect finish")
891 }
892 CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
893 }
894 }
895
896 pub(super) fn on_partial_graph_reset(mut self) {
897 match &mut self.status {
898 CreatingStreamingJobStatus::Resetting(notifiers) => {
899 for notifier in notifiers.drain(..) {
900 notifier.notify_collected();
901 }
902 }
903 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
904 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
905 | CreatingStreamingJobStatus::Finishing(_, _) => {
906 panic!(
907 "should be resetting when receiving reset partial graph resp, but at {:?}",
908 self.status
909 )
910 }
911 CreatingStreamingJobStatus::PlaceHolder => {
912 unreachable!()
913 }
914 }
915 }
916
917 pub(super) fn drop(
921 &mut self,
922 notifiers: &mut Vec<Notifier>,
923 partial_graph_manager: &mut PartialGraphManager,
924 ) -> bool {
925 match &mut self.status {
926 CreatingStreamingJobStatus::Resetting(existing_notifiers) => {
927 for notifier in &mut *notifiers {
928 notifier.notify_started();
929 }
930 existing_notifiers.append(notifiers);
931 true
932 }
933 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
934 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
935 for notifier in &mut *notifiers {
936 notifier.notify_started();
937 }
938 partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
939 self.status = CreatingStreamingJobStatus::Resetting(take(notifiers));
940 true
941 }
942 CreatingStreamingJobStatus::Finishing(_, _) => false,
943 CreatingStreamingJobStatus::PlaceHolder => {
944 unreachable!()
945 }
946 }
947 }
948
949 pub(super) fn reset(self) -> bool {
950 match self.status {
951 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
952 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
953 | CreatingStreamingJobStatus::Finishing(_, _) => false,
954 CreatingStreamingJobStatus::Resetting(notifiers) => {
955 for notifier in notifiers {
956 notifier.notify_collected();
957 }
958 true
959 }
960 CreatingStreamingJobStatus::PlaceHolder => {
961 unreachable!()
962 }
963 }
964 }
965}