1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet, hash_map};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_meta_model::{DispatcherType, WorkerId};
30use risingwave_pb::ddl_service::PbBackfillType;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
33use risingwave_pb::stream_plan::barrier::PbBarrierKind;
34use risingwave_pb::stream_plan::barrier_mutation::Mutation;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_plan::{AddMutation, StopMutation};
37use risingwave_pb::stream_service::BarrierCompleteResponse;
38use status::CreatingStreamingJobStatus;
39use tracing::{debug, info};
40
41use super::state::RenderResult;
42use crate::MetaResult;
43use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
44use crate::barrier::checkpoint::creating_job::barrier_control::CreatingStreamingJobBarrierStats;
45use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
46use crate::barrier::command::PostCollectCommand;
47use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
48use crate::barrier::edge_builder::FragmentEdgeBuildResult;
49use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
50use crate::barrier::notifier::Notifier;
51use crate::barrier::partial_graph::{
52 CollectedBarrier, PartialGraphBarrierInfo, PartialGraphManager, PartialGraphRecoverer,
53};
54use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
55use crate::barrier::rpc::{build_locality_fragment_state_table_mapping, to_partial_graph_id};
56use crate::barrier::{
57 BackfillOrderState, BackfillProgress, BarrierKind, Command, FragmentBackfillProgress,
58 TracedEpoch,
59};
60use crate::controller::fragment::InflightFragmentInfo;
61use crate::model::{FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate};
62use crate::rpc::metrics::GLOBAL_META_METRICS;
63use crate::stream::source_manager::SplitAssignment;
64use crate::stream::{ExtendedFragmentBackfillOrder, build_actor_connector_splits};
65
66#[derive(Debug)]
67pub(crate) struct CreatingJobInfo {
68 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
69 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
70 pub downstreams: FragmentDownstreamRelation,
71 pub snapshot_backfill_upstream_tables: HashSet<TableId>,
72 pub stream_actors: HashMap<ActorId, StreamActor>,
73}
74
75#[derive(Debug)]
76pub(crate) struct CreatingStreamingJobControl {
77 job_id: JobId,
78 partial_graph_id: PartialGraphId,
79 snapshot_backfill_upstream_tables: HashSet<TableId>,
80 snapshot_epoch: u64,
81
82 node_actors: HashMap<WorkerId, HashSet<ActorId>>,
83 state_table_ids: HashSet<TableId>,
84
85 barrier_control: CreatingStreamingJobBarrierControl,
86 status: CreatingStreamingJobStatus,
87
88 upstream_lag: LabelGuardedIntGauge,
89}
90
91fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
92 let mut has_unreschedulable_scan = false;
93 visit_stream_node_cont(&fragment.nodes, |node| {
94 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
95 let scan_type = stream_scan.stream_scan_type();
96 if !scan_type.is_reschedulable(true) {
97 has_unreschedulable_scan = true;
98 return false;
99 }
100 }
101 true
102 });
103 has_unreschedulable_scan
104}
105
106fn collect_fragment_upstream_fragment_ids(
107 fragment: &InflightFragmentInfo,
108 upstream_fragment_ids: &mut HashSet<FragmentId>,
109) {
110 visit_stream_node_cont(&fragment.nodes, |node| {
111 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
112 upstream_fragment_ids.insert(merge.upstream_fragment_id);
113 }
114 true
115 });
116}
117
118impl CreatingStreamingJobControl {
119 pub(super) fn new<'a>(
120 entry: hash_map::VacantEntry<'a, JobId, Self>,
121 create_info: CreateSnapshotBackfillJobCommandInfo,
122 notifiers: Vec<Notifier>,
123 snapshot_backfill_upstream_tables: HashSet<TableId>,
124 snapshot_epoch: u64,
125 version_stat: &HummockVersionStats,
126 partial_graph_manager: &mut PartialGraphManager,
127 edges: &mut FragmentEdgeBuildResult,
128 split_assignment: &SplitAssignment,
129 actors: &RenderResult,
130 ) -> MetaResult<&'a mut Self> {
131 let info = create_info.info.clone();
132 let job_id = info.stream_job_fragments.stream_job_id();
133 let database_id = info.streaming_job.database_id();
134 debug!(
135 %job_id,
136 definition = info.definition,
137 "new creating job"
138 );
139 let fragment_infos = info
140 .stream_job_fragments
141 .new_fragment_info(
142 &actors.stream_actors,
143 &actors.actor_location,
144 split_assignment,
145 )
146 .collect();
147 let snapshot_backfill_actors =
148 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
149 let backfill_nodes_to_pause =
150 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
151 .into_iter()
152 .collect();
153 let backfill_order_state = BackfillOrderState::new(
154 &info.fragment_backfill_ordering,
155 &fragment_infos,
156 info.locality_fragment_state_table_mapping.clone(),
157 );
158 let create_mview_tracker = CreateMviewProgressTracker::recover(
159 job_id,
160 &fragment_infos,
161 backfill_order_state,
162 version_stat,
163 );
164
165 let actors_to_create = Command::create_streaming_job_actors_to_create(
166 &info,
167 edges,
168 &actors.stream_actors,
169 &actors.actor_location,
170 );
171
172 let barrier_control = CreatingStreamingJobBarrierControl::new(job_id, None);
173
174 let mut prev_epoch_fake_physical_time = 0;
175 let mut pending_non_checkpoint_barriers = vec![];
176
177 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
178 &mut prev_epoch_fake_physical_time,
179 &mut pending_non_checkpoint_barriers,
180 PbBarrierKind::Checkpoint,
181 );
182
183 let added_actors: Vec<ActorId> = actors
184 .stream_actors
185 .values()
186 .flatten()
187 .map(|actor| actor.actor_id)
188 .collect();
189 let actor_splits = split_assignment
190 .values()
191 .flat_map(build_actor_connector_splits)
192 .collect();
193
194 assert!(
195 info.cdc_table_snapshot_splits.is_none(),
196 "should not have cdc backfill for snapshot backfill job"
197 );
198
199 let initial_mutation = Mutation::Add(AddMutation {
200 actor_dispatchers: Default::default(),
202 added_actors,
203 actor_splits,
204 pause: false,
206 subscriptions_to_add: Default::default(),
207 backfill_nodes_to_pause,
208 actor_cdc_table_snapshot_splits: None,
209 new_upstream_sinks: Default::default(),
210 });
211
212 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
213 let state_table_ids =
214 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
215
216 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
217
218 let job = entry.insert(Self {
219 partial_graph_id,
220 job_id,
221 snapshot_backfill_upstream_tables,
222 barrier_control,
223 snapshot_epoch,
224 status: CreatingStreamingJobStatus::PlaceHolder, upstream_lag: GLOBAL_META_METRICS
226 .snapshot_backfill_lag
227 .with_guarded_label_values(&[&format!("{}", job_id)]),
228 node_actors,
229 state_table_ids,
230 });
231
232 let mut graph_adder = partial_graph_manager.add_partial_graph(
233 partial_graph_id,
234 CreatingStreamingJobBarrierStats::new(job_id, snapshot_epoch),
235 );
236
237 if let Err(e) = Self::inject_barrier(
238 partial_graph_id,
239 graph_adder.manager(),
240 &mut job.barrier_control,
241 &job.node_actors,
242 &job.state_table_ids,
243 false,
244 initial_barrier_info,
245 Some(actors_to_create),
246 Some(initial_mutation),
247 notifiers,
248 Some(create_info),
249 ) {
250 graph_adder.failed();
251 job.status = CreatingStreamingJobStatus::Resetting(vec![]);
252 Err(e)
253 } else {
254 graph_adder.added();
255 assert!(pending_non_checkpoint_barriers.is_empty());
256 let job_info = CreatingJobInfo {
257 fragment_infos,
258 upstream_fragment_downstreams: info.upstream_fragment_downstreams.clone(),
259 downstreams: info.stream_job_fragments.downstreams,
260 snapshot_backfill_upstream_tables: job.snapshot_backfill_upstream_tables.clone(),
261 stream_actors: actors
262 .stream_actors
263 .values()
264 .flatten()
265 .map(|actor| (actor.actor_id, actor.clone()))
266 .collect(),
267 };
268 job.status = CreatingStreamingJobStatus::ConsumingSnapshot {
269 prev_epoch_fake_physical_time,
270 pending_upstream_barriers: vec![],
271 version_stats: version_stat.clone(),
272 create_mview_tracker,
273 snapshot_backfill_actors,
274 snapshot_epoch,
275 info: job_info,
276 pending_non_checkpoint_barriers,
277 };
278 Ok(job)
279 }
280 }
281
282 pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
283 match &self.status {
284 CreatingStreamingJobStatus::ConsumingSnapshot {
285 create_mview_tracker,
286 info,
287 ..
288 } => create_mview_tracker.collect_fragment_progress(&info.fragment_infos, true),
289 CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
290 collect_done_fragments(self.job_id, &info.fragment_infos)
291 }
292 CreatingStreamingJobStatus::Finishing(_, _)
293 | CreatingStreamingJobStatus::Resetting(_)
294 | CreatingStreamingJobStatus::PlaceHolder => vec![],
295 }
296 }
297
298 fn resolve_upstream_log_epochs(
299 snapshot_backfill_upstream_tables: &HashSet<TableId>,
300 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
301 exclusive_start_log_epoch: u64,
302 upstream_barrier_info: &BarrierInfo,
303 ) -> MetaResult<Vec<BarrierInfo>> {
304 let table_id = snapshot_backfill_upstream_tables
305 .iter()
306 .next()
307 .expect("snapshot backfill job should have upstream");
308 let epochs_iter = if let Some(epochs) = upstream_table_log_epochs.get(table_id) {
309 let mut epochs_iter = epochs.iter();
310 loop {
311 let (_, checkpoint_epoch) =
312 epochs_iter.next().expect("not reach committed epoch yet");
313 if *checkpoint_epoch < exclusive_start_log_epoch {
314 continue;
315 }
316 assert_eq!(*checkpoint_epoch, exclusive_start_log_epoch);
317 break;
318 }
319 epochs_iter
320 } else {
321 assert_eq!(
323 upstream_barrier_info.prev_epoch(),
324 exclusive_start_log_epoch
325 );
326 static EMPTY_VEC: Vec<(Vec<u64>, u64)> = Vec::new();
327 EMPTY_VEC.iter()
328 };
329
330 let mut ret = vec![];
331 let mut prev_epoch = exclusive_start_log_epoch;
332 let mut pending_non_checkpoint_barriers = vec![];
333 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
334 for (i, epoch) in non_checkpoint_epochs
335 .iter()
336 .chain([checkpoint_epoch])
337 .enumerate()
338 {
339 assert!(*epoch > prev_epoch);
340 pending_non_checkpoint_barriers.push(prev_epoch);
341 ret.push(BarrierInfo {
342 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
343 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
344 kind: if i == 0 {
345 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
346 } else {
347 BarrierKind::Barrier
348 },
349 });
350 prev_epoch = *epoch;
351 }
352 }
353 ret.push(BarrierInfo {
354 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
355 curr_epoch: TracedEpoch::new(Epoch(upstream_barrier_info.curr_epoch())),
356 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
357 });
358 Ok(ret)
359 }
360
361 fn recover_consuming_snapshot(
362 job_id: JobId,
363 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
364 snapshot_epoch: u64,
365 committed_epoch: u64,
366 upstream_barrier_info: &BarrierInfo,
367 info: CreatingJobInfo,
368 backfill_order_state: BackfillOrderState,
369 version_stat: &HummockVersionStats,
370 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
371 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
372 let mut pending_non_checkpoint_barriers = vec![];
373 let create_mview_tracker = CreateMviewProgressTracker::recover(
374 job_id,
375 &info.fragment_infos,
376 backfill_order_state,
377 version_stat,
378 );
379 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
380 &mut prev_epoch_fake_physical_time,
381 &mut pending_non_checkpoint_barriers,
382 PbBarrierKind::Initial,
383 );
384
385 Ok((
386 CreatingStreamingJobStatus::ConsumingSnapshot {
387 prev_epoch_fake_physical_time,
388 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
389 &info.snapshot_backfill_upstream_tables,
390 upstream_table_log_epochs,
391 snapshot_epoch,
392 upstream_barrier_info,
393 )?,
394 version_stats: version_stat.clone(),
395 create_mview_tracker,
396 snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
397 &info.fragment_infos,
398 )
399 .collect(),
400 info,
401 snapshot_epoch,
402 pending_non_checkpoint_barriers,
403 },
404 barrier_info,
405 ))
406 }
407
408 fn recover_consuming_log_store(
409 job_id: JobId,
410 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
411 committed_epoch: u64,
412 upstream_barrier_info: &BarrierInfo,
413 info: CreatingJobInfo,
414 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
415 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
416 &info.snapshot_backfill_upstream_tables,
417 upstream_table_log_epochs,
418 committed_epoch,
419 upstream_barrier_info,
420 )?;
421 let mut first_barrier = barriers_to_inject.remove(0);
422 assert!(first_barrier.kind.is_checkpoint());
423 first_barrier.kind = BarrierKind::Initial;
424
425 Ok((
426 CreatingStreamingJobStatus::ConsumingLogStore {
427 tracking_job: TrackingJob::recovered(job_id, &info.fragment_infos),
428 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
429 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&info.fragment_infos),
430 barriers_to_inject
431 .last()
432 .map(|info| info.prev_epoch() - committed_epoch)
433 .unwrap_or(0),
434 ),
435 barriers_to_inject: Some(barriers_to_inject),
436 info,
437 },
438 first_barrier,
439 ))
440 }
441
442 #[expect(clippy::too_many_arguments)]
443 pub(crate) fn recover(
444 database_id: DatabaseId,
445 job_id: JobId,
446 snapshot_backfill_upstream_tables: HashSet<TableId>,
447 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
448 snapshot_epoch: u64,
449 committed_epoch: u64,
450 upstream_barrier_info: &BarrierInfo,
451 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
452 backfill_order: ExtendedFragmentBackfillOrder,
453 fragment_relations: &FragmentDownstreamRelation,
454 version_stat: &HummockVersionStats,
455 new_actors: StreamJobActorsToCreate,
456 initial_mutation: Mutation,
457 partial_graph_recoverer: &mut PartialGraphRecoverer<'_>,
458 ) -> MetaResult<Self> {
459 info!(
460 %job_id,
461 "recovered creating snapshot backfill job"
462 );
463 let barrier_control =
464 CreatingStreamingJobBarrierControl::new(job_id, Some(committed_epoch));
465
466 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
467 let state_table_ids: HashSet<_> =
468 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
469
470 let mut upstream_fragment_downstreams: FragmentDownstreamRelation = Default::default();
471 for (upstream_fragment_id, downstreams) in fragment_relations {
472 if fragment_infos.contains_key(upstream_fragment_id) {
473 continue;
474 }
475 for downstream in downstreams {
476 if fragment_infos.contains_key(&downstream.downstream_fragment_id) {
477 upstream_fragment_downstreams
478 .entry(*upstream_fragment_id)
479 .or_default()
480 .push(downstream.clone());
481 }
482 }
483 }
484 let downstreams = fragment_infos
485 .keys()
486 .filter_map(|fragment_id| {
487 fragment_relations
488 .get(fragment_id)
489 .map(|relation| (*fragment_id, relation.clone()))
490 })
491 .collect();
492
493 let info = CreatingJobInfo {
494 fragment_infos,
495 upstream_fragment_downstreams,
496 downstreams,
497 snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
498 stream_actors: new_actors
499 .values()
500 .flat_map(|fragments| {
501 fragments.values().flat_map(|(_, actors, _)| {
502 actors
503 .iter()
504 .map(|(actor, _, _)| (actor.actor_id, actor.clone()))
505 })
506 })
507 .collect(),
508 };
509
510 let (status, first_barrier_info) = if committed_epoch < snapshot_epoch {
511 let locality_fragment_state_table_mapping =
512 build_locality_fragment_state_table_mapping(&info.fragment_infos);
513 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
514 &backfill_order,
515 &info.fragment_infos,
516 locality_fragment_state_table_mapping,
517 );
518 Self::recover_consuming_snapshot(
519 job_id,
520 upstream_table_log_epochs,
521 snapshot_epoch,
522 committed_epoch,
523 upstream_barrier_info,
524 info,
525 backfill_order_state,
526 version_stat,
527 )?
528 } else {
529 Self::recover_consuming_log_store(
530 job_id,
531 upstream_table_log_epochs,
532 committed_epoch,
533 upstream_barrier_info,
534 info,
535 )?
536 };
537
538 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
539
540 partial_graph_recoverer.recover_graph(
541 partial_graph_id,
542 initial_mutation,
543 &first_barrier_info,
544 &node_actors,
545 state_table_ids.iter().copied(),
546 new_actors,
547 CreatingStreamingJobBarrierStats::new(job_id, snapshot_epoch),
548 )?;
549
550 Ok(Self {
551 job_id,
552 partial_graph_id,
553 snapshot_backfill_upstream_tables,
554 snapshot_epoch,
555 node_actors,
556 state_table_ids,
557 barrier_control,
558 status,
559 upstream_lag: GLOBAL_META_METRICS
560 .snapshot_backfill_lag
561 .with_guarded_label_values(&[&format!("{}", job_id)]),
562 })
563 }
564
565 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
566 self.status
567 .fragment_infos()
568 .map(|fragment_infos| {
569 !InflightFragmentInfo::contains_worker(fragment_infos.values(), worker_id)
570 })
571 .unwrap_or(true)
572 }
573
574 pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
575 let progress = match &self.status {
576 CreatingStreamingJobStatus::ConsumingSnapshot {
577 create_mview_tracker,
578 ..
579 } => {
580 if create_mview_tracker.is_finished() {
581 "Snapshot finished".to_owned()
582 } else {
583 let progress = create_mview_tracker.gen_backfill_progress();
584 format!("Snapshot [{}]", progress)
585 }
586 }
587 CreatingStreamingJobStatus::ConsumingLogStore {
588 log_store_progress_tracker,
589 ..
590 } => {
591 format!(
592 "LogStore [{}]",
593 log_store_progress_tracker.gen_backfill_progress()
594 )
595 }
596 CreatingStreamingJobStatus::Finishing(..) => {
597 format!(
598 "Finishing [epoch count: {}]",
599 self.barrier_control.inflight_barrier_count()
600 )
601 }
602 CreatingStreamingJobStatus::Resetting(_) => "Resetting".to_owned(),
603 CreatingStreamingJobStatus::PlaceHolder => {
604 unreachable!()
605 }
606 };
607 BackfillProgress {
608 progress,
609 backfill_type: PbBackfillType::SnapshotBackfill,
610 }
611 }
612
613 pub(super) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
614 (
615 max(
616 self.barrier_control.max_committed_epoch().unwrap_or(0),
617 self.snapshot_epoch,
618 ),
619 self.snapshot_backfill_upstream_tables.clone(),
620 )
621 }
622
623 #[expect(clippy::too_many_arguments)]
624 fn inject_barrier(
625 partial_graph_id: PartialGraphId,
626 partial_graph_manager: &mut PartialGraphManager,
627 barrier_control: &mut CreatingStreamingJobBarrierControl,
628 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
629 state_table_ids: &HashSet<TableId>,
630 is_finishing: bool,
631 barrier_info: BarrierInfo,
632 new_actors: Option<StreamJobActorsToCreate>,
633 mutation: Option<Mutation>,
634 notifiers: Vec<Notifier>,
635 first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
636 ) -> MetaResult<()> {
637 let (table_ids_to_sync, nodes_to_sync_table) = if !is_finishing {
638 (Some(state_table_ids), Some(node_actors.keys().copied()))
639 } else {
640 (None, None)
641 };
642 let prev_epoch = barrier_info.prev_epoch();
643 partial_graph_manager.inject_barrier(
644 partial_graph_id,
645 mutation,
646 node_actors,
647 table_ids_to_sync.into_iter().flatten().copied(),
648 nodes_to_sync_table.into_iter().flatten(),
649 new_actors,
650 PartialGraphBarrierInfo::new(
651 first_create_info.map_or_else(
652 PostCollectCommand::barrier,
653 CreateSnapshotBackfillJobCommandInfo::into_post_collect,
654 ),
655 barrier_info,
656 notifiers,
657 state_table_ids.clone(),
658 ),
659 )?;
660 barrier_control.enqueue_epoch(prev_epoch);
661 Ok(())
662 }
663
664 pub(super) fn start_consume_upstream(
665 &mut self,
666 partial_graph_manager: &mut PartialGraphManager,
667 barrier_info: &BarrierInfo,
668 ) -> MetaResult<CreatingJobInfo> {
669 info!(
670 job_id = %self.job_id,
671 prev_epoch = barrier_info.prev_epoch(),
672 "start consuming upstream"
673 );
674 let info = self.status.start_consume_upstream(barrier_info);
675 Self::inject_barrier(
676 self.partial_graph_id,
677 partial_graph_manager,
678 &mut self.barrier_control,
679 &self.node_actors,
680 &self.state_table_ids,
681 true,
682 barrier_info.clone(),
683 None,
684 Some(Mutation::Stop(StopMutation {
685 actors: info
687 .fragment_infos
688 .values()
689 .flat_map(|info| info.actors.keys().copied())
690 .collect(),
691 dropped_sink_fragments: vec![], })),
693 vec![], None,
695 )?;
696 Ok(info)
697 }
698
699 pub(super) fn on_new_upstream_barrier(
700 &mut self,
701 partial_graph_manager: &mut PartialGraphManager,
702 barrier_info: &BarrierInfo,
703 mutation: Option<(Mutation, Vec<Notifier>)>,
704 ) -> MetaResult<()> {
705 let progress_epoch =
706 if let Some(max_committed_epoch) = self.barrier_control.max_committed_epoch() {
707 max(max_committed_epoch, self.snapshot_epoch)
708 } else {
709 self.snapshot_epoch
710 };
711 self.upstream_lag.set(
712 barrier_info
713 .prev_epoch
714 .value()
715 .0
716 .saturating_sub(progress_epoch) as _,
717 );
718 let (mut mutation, mut notifiers) = match mutation {
719 Some((mutation, notifiers)) => (Some(mutation), notifiers),
720 None => (None, vec![]),
721 };
722 {
723 for (barrier_to_inject, mutation) in self
724 .status
725 .on_new_upstream_epoch(barrier_info, mutation.take())
726 {
727 Self::inject_barrier(
728 self.partial_graph_id,
729 partial_graph_manager,
730 &mut self.barrier_control,
731 &self.node_actors,
732 &self.state_table_ids,
733 false,
734 barrier_to_inject,
735 None,
736 mutation,
737 take(&mut notifiers),
738 None,
739 )?;
740 }
741 assert!(mutation.is_none(), "must have consumed mutation");
742 assert!(notifiers.is_empty(), "must consumed notifiers");
743 }
744 Ok(())
745 }
746
747 pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
748 self.status.update_progress(
749 collected_barrier
750 .resps
751 .values()
752 .flat_map(|resp| &resp.create_mview_progress),
753 );
754 self.barrier_control.collect(collected_barrier);
755 self.should_merge_to_upstream()
756 }
757
758 pub(super) fn should_merge_to_upstream(&self) -> bool {
759 if let CreatingStreamingJobStatus::ConsumingLogStore {
760 log_store_progress_tracker,
761 barriers_to_inject,
762 ..
763 } = &self.status
764 && barriers_to_inject.is_none()
765 && log_store_progress_tracker.is_finished()
766 {
767 true
768 } else {
769 false
770 }
771 }
772}
773
774impl CreatingStreamingJobControl {
775 pub(super) fn start_completing(
776 &mut self,
777 partial_graph_manager: &mut PartialGraphManager,
778 min_upstream_inflight_epoch: Option<u64>,
779 upstream_committed_epoch: u64,
780 ) -> Option<(
781 u64,
782 Vec<BarrierCompleteResponse>,
783 PartialGraphBarrierInfo,
784 bool,
785 )> {
786 if upstream_committed_epoch < self.snapshot_epoch {
788 return None;
789 }
790 let (finished_at_epoch, epoch_end_bound) = match &self.status {
791 CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
792 let epoch_end_bound = min_upstream_inflight_epoch
793 .map(|upstream_epoch| {
794 if upstream_epoch < *finish_at_epoch {
795 Excluded(upstream_epoch)
796 } else {
797 Unbounded
798 }
799 })
800 .unwrap_or(Unbounded);
801 (Some(*finish_at_epoch), epoch_end_bound)
802 }
803 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
804 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
805 None,
806 min_upstream_inflight_epoch
807 .map(Excluded)
808 .unwrap_or(Unbounded),
809 ),
810 CreatingStreamingJobStatus::Resetting(..) => {
811 return None;
812 }
813 CreatingStreamingJobStatus::PlaceHolder => {
814 unreachable!()
815 }
816 };
817 self.barrier_control
818 .start_completing(epoch_end_bound, |epoch| {
819 partial_graph_manager.take_collected_barrier(self.partial_graph_id, epoch)
820 })
821 .map(|(epoch, resps, info)| {
822 let is_finish_epoch = if let Some(finish_at_epoch) = finished_at_epoch {
823 assert!(!info.post_collect_command.should_checkpoint());
824 if epoch == finish_at_epoch {
825 self.barrier_control.ack_completed(epoch);
826 assert!(self.barrier_control.is_empty());
827 true
828 } else {
829 false
830 }
831 } else {
832 false
833 };
834 (epoch, resps, info, is_finish_epoch)
835 })
836 }
837
838 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
839 self.barrier_control.ack_completed(completed_epoch);
840 }
841
842 pub fn fragment_infos_with_job_id(
843 &self,
844 ) -> impl Iterator<Item = (&InflightFragmentInfo, JobId)> + '_ {
845 self.status
846 .fragment_infos()
847 .into_iter()
848 .flat_map(|fragments| fragments.values().map(|fragment| (fragment, self.job_id)))
849 }
850
851 pub(super) fn collect_reschedule_blocked_fragment_ids(
852 &self,
853 blocked_fragment_ids: &mut HashSet<FragmentId>,
854 ) {
855 let Some(info) = self.status.creating_job_info() else {
856 return;
857 };
858
859 for (fragment_id, fragment) in &info.fragment_infos {
860 if fragment_has_online_unreschedulable_scan(fragment) {
861 blocked_fragment_ids.insert(*fragment_id);
862 collect_fragment_upstream_fragment_ids(fragment, blocked_fragment_ids);
863 }
864 }
865 }
866
867 pub(super) fn collect_no_shuffle_fragment_relations(
868 &self,
869 no_shuffle_relations: &mut Vec<(FragmentId, FragmentId)>,
870 ) {
871 let Some(info) = self.status.creating_job_info() else {
872 return;
873 };
874
875 for (upstream_fragment_id, downstreams) in &info.upstream_fragment_downstreams {
876 no_shuffle_relations.extend(
877 downstreams
878 .iter()
879 .filter(|downstream| downstream.dispatcher_type == DispatcherType::NoShuffle)
880 .map(|downstream| (*upstream_fragment_id, downstream.downstream_fragment_id)),
881 );
882 }
883
884 for (fragment_id, downstreams) in &info.downstreams {
885 no_shuffle_relations.extend(
886 downstreams
887 .iter()
888 .filter(|downstream| downstream.dispatcher_type == DispatcherType::NoShuffle)
889 .map(|downstream| (*fragment_id, downstream.downstream_fragment_id)),
890 );
891 }
892 }
893
894 pub fn into_tracking_job(self) -> TrackingJob {
895 assert!(self.barrier_control.is_empty());
896 match self.status {
897 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
898 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
899 | CreatingStreamingJobStatus::Resetting(..)
900 | CreatingStreamingJobStatus::PlaceHolder => {
901 unreachable!("expect finish")
902 }
903 CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
904 }
905 }
906
907 pub(super) fn on_partial_graph_reset(mut self) {
908 match &mut self.status {
909 CreatingStreamingJobStatus::Resetting(notifiers) => {
910 for notifier in notifiers.drain(..) {
911 notifier.notify_collected();
912 }
913 }
914 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
915 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
916 | CreatingStreamingJobStatus::Finishing(_, _) => {
917 panic!(
918 "should be resetting when receiving reset partial graph resp, but at {:?}",
919 self.status
920 )
921 }
922 CreatingStreamingJobStatus::PlaceHolder => {
923 unreachable!()
924 }
925 }
926 }
927
928 pub(super) fn drop(
932 &mut self,
933 notifiers: &mut Vec<Notifier>,
934 partial_graph_manager: &mut PartialGraphManager,
935 ) -> bool {
936 match &mut self.status {
937 CreatingStreamingJobStatus::Resetting(existing_notifiers) => {
938 for notifier in &mut *notifiers {
939 notifier.notify_started();
940 }
941 existing_notifiers.append(notifiers);
942 true
943 }
944 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
945 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
946 for notifier in &mut *notifiers {
947 notifier.notify_started();
948 }
949 partial_graph_manager.reset_partial_graphs([self.partial_graph_id]);
950 self.status = CreatingStreamingJobStatus::Resetting(take(notifiers));
951 true
952 }
953 CreatingStreamingJobStatus::Finishing(_, _) => false,
954 CreatingStreamingJobStatus::PlaceHolder => {
955 unreachable!()
956 }
957 }
958 }
959
960 pub(super) fn reset(self) -> bool {
961 match self.status {
962 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
963 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
964 | CreatingStreamingJobStatus::Finishing(_, _) => false,
965 CreatingStreamingJobStatus::Resetting(notifiers) => {
966 for notifier in notifiers {
967 notifier.notify_collected();
968 }
969 true
970 }
971 CreatingStreamingJobStatus::PlaceHolder => {
972 unreachable!()
973 }
974 }
975 }
976}