1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::WorkerId;
29use risingwave_pb::ddl_service::PbBackfillType;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::id::{ActorId, FragmentId};
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_plan::{AddMutation, StopMutation};
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
37use status::CreatingStreamingJobStatus;
38use tracing::{debug, info};
39
40use crate::MetaResult;
41use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
42use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
43use crate::barrier::checkpoint::recovery::ResetPartialGraphCollector;
44use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
45use crate::barrier::edge_builder::FragmentEdgeBuildResult;
46use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
47use crate::barrier::notifier::Notifier;
48use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob, collect_done_fragments};
49use crate::barrier::rpc::{
50 ControlStreamManager, build_locality_fragment_state_table_mapping, to_partial_graph_id,
51};
52use crate::barrier::{
53 BackfillOrderState, BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch,
54};
55use crate::controller::fragment::InflightFragmentInfo;
56use crate::model::{FragmentDownstreamRelation, StreamActor, StreamJobActorsToCreate};
57use crate::rpc::metrics::GLOBAL_META_METRICS;
58use crate::stream::{ExtendedFragmentBackfillOrder, build_actor_connector_splits};
59
60#[derive(Debug)]
61pub(crate) struct CreatingJobInfo {
62 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
63 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
64 pub downstreams: FragmentDownstreamRelation,
65 pub snapshot_backfill_upstream_tables: HashSet<TableId>,
66 pub stream_actors: HashMap<ActorId, StreamActor>,
67}
68
69#[derive(Debug)]
70pub(crate) struct CreatingStreamingJobControl {
71 database_id: DatabaseId,
72 pub(super) job_id: JobId,
73 pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
74 snapshot_epoch: u64,
75
76 node_actors: HashMap<WorkerId, HashSet<ActorId>>,
77 state_table_ids: HashSet<TableId>,
78
79 barrier_control: CreatingStreamingJobBarrierControl,
80 status: CreatingStreamingJobStatus,
81
82 upstream_lag: LabelGuardedIntGauge,
83}
84
85impl CreatingStreamingJobControl {
86 pub(super) fn new(
87 create_info: CreateSnapshotBackfillJobCommandInfo,
88 notifiers: Vec<Notifier>,
89 snapshot_backfill_upstream_tables: HashSet<TableId>,
90 snapshot_epoch: u64,
91 version_stat: &HummockVersionStats,
92 control_stream_manager: &mut ControlStreamManager,
93 edges: &mut FragmentEdgeBuildResult,
94 ) -> MetaResult<Self> {
95 let info = create_info.info.clone();
96 let job_id = info.stream_job_fragments.stream_job_id();
97 let database_id = info.streaming_job.database_id();
98 debug!(
99 %job_id,
100 definition = info.definition,
101 "new creating job"
102 );
103 let fragment_infos = info
104 .stream_job_fragments
105 .new_fragment_info(&info.init_split_assignment)
106 .collect();
107 let snapshot_backfill_actors =
108 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
109 let backfill_nodes_to_pause =
110 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
111 .into_iter()
112 .collect();
113 let backfill_order_state = BackfillOrderState::new(
114 &info.fragment_backfill_ordering,
115 &info.stream_job_fragments,
116 info.locality_fragment_state_table_mapping.clone(),
117 );
118 let create_mview_tracker = CreateMviewProgressTracker::recover(
119 job_id,
120 &fragment_infos,
121 backfill_order_state,
122 version_stat,
123 );
124
125 let actors_to_create =
126 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create().map(
127 |(fragment_id, node, actors)| {
128 (
129 fragment_id,
130 node,
131 actors,
132 [], )
134 },
135 ));
136
137 let mut barrier_control =
138 CreatingStreamingJobBarrierControl::new(job_id, snapshot_epoch, None);
139
140 let mut prev_epoch_fake_physical_time = 0;
141 let mut pending_non_checkpoint_barriers = vec![];
142
143 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
144 &mut prev_epoch_fake_physical_time,
145 &mut pending_non_checkpoint_barriers,
146 PbBarrierKind::Checkpoint,
147 );
148
149 let added_actors = info.stream_job_fragments.actor_ids().collect();
150 let actor_splits = info
151 .init_split_assignment
152 .values()
153 .flat_map(build_actor_connector_splits)
154 .collect();
155
156 assert!(
157 info.cdc_table_snapshot_splits.is_none(),
158 "should not have cdc backfill for snapshot backfill job"
159 );
160
161 let initial_mutation = Mutation::Add(AddMutation {
162 actor_dispatchers: Default::default(),
164 added_actors,
165 actor_splits,
166 pause: false,
168 subscriptions_to_add: Default::default(),
169 backfill_nodes_to_pause,
170 actor_cdc_table_snapshot_splits: None,
171 new_upstream_sinks: Default::default(),
172 });
173
174 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
175 let state_table_ids =
176 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
177
178 control_stream_manager.add_partial_graph(database_id, Some(job_id));
179 Self::inject_barrier(
180 database_id,
181 job_id,
182 control_stream_manager,
183 &mut barrier_control,
184 &node_actors,
185 Some(&state_table_ids),
186 initial_barrier_info,
187 Some(actors_to_create),
188 Some(initial_mutation),
189 notifiers,
190 Some(create_info),
191 )?;
192
193 assert!(pending_non_checkpoint_barriers.is_empty());
194
195 let job_info = CreatingJobInfo {
196 fragment_infos,
197 upstream_fragment_downstreams: info.upstream_fragment_downstreams.clone(),
198 downstreams: info.stream_job_fragments.downstreams.clone(),
199 snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
200 stream_actors: info
201 .stream_job_fragments
202 .fragments
203 .values()
204 .flat_map(|fragment| {
205 fragment
206 .actors
207 .iter()
208 .map(|actor| (actor.actor_id, actor.clone()))
209 })
210 .collect(),
211 };
212
213 Ok(Self {
214 database_id,
215 job_id,
216 snapshot_backfill_upstream_tables,
217 barrier_control,
218 snapshot_epoch,
219 status: CreatingStreamingJobStatus::ConsumingSnapshot {
220 prev_epoch_fake_physical_time,
221 pending_upstream_barriers: vec![],
222 version_stats: version_stat.clone(),
223 create_mview_tracker,
224 snapshot_backfill_actors,
225 snapshot_epoch,
226 info: job_info,
227 pending_non_checkpoint_barriers,
228 },
229 upstream_lag: GLOBAL_META_METRICS
230 .snapshot_backfill_lag
231 .with_guarded_label_values(&[&format!("{}", job_id)]),
232 node_actors,
233 state_table_ids,
234 })
235 }
236
237 pub(super) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
238 match &self.status {
239 CreatingStreamingJobStatus::ConsumingSnapshot {
240 create_mview_tracker,
241 info,
242 ..
243 } => create_mview_tracker.collect_fragment_progress(&info.fragment_infos, true),
244 CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
245 collect_done_fragments(self.job_id, &info.fragment_infos)
246 }
247 CreatingStreamingJobStatus::Finishing(_, _)
248 | CreatingStreamingJobStatus::Resetting(_, _)
249 | CreatingStreamingJobStatus::PlaceHolder => vec![],
250 }
251 }
252
253 fn resolve_upstream_log_epochs(
254 snapshot_backfill_upstream_tables: &HashSet<TableId>,
255 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
256 exclusive_start_log_epoch: u64,
257 upstream_barrier_info: &BarrierInfo,
258 ) -> MetaResult<Vec<BarrierInfo>> {
259 let table_id = snapshot_backfill_upstream_tables
260 .iter()
261 .next()
262 .expect("snapshot backfill job should have upstream");
263 let epochs_iter = if let Some(epochs) = upstream_table_log_epochs.get(table_id) {
264 let mut epochs_iter = epochs.iter();
265 loop {
266 let (_, checkpoint_epoch) =
267 epochs_iter.next().expect("not reach committed epoch yet");
268 if *checkpoint_epoch < exclusive_start_log_epoch {
269 continue;
270 }
271 assert_eq!(*checkpoint_epoch, exclusive_start_log_epoch);
272 break;
273 }
274 epochs_iter
275 } else {
276 assert_eq!(
278 upstream_barrier_info.prev_epoch(),
279 exclusive_start_log_epoch
280 );
281 static EMPTY_VEC: Vec<(Vec<u64>, u64)> = Vec::new();
282 EMPTY_VEC.iter()
283 };
284
285 let mut ret = vec![];
286 let mut prev_epoch = exclusive_start_log_epoch;
287 let mut pending_non_checkpoint_barriers = vec![];
288 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
289 for (i, epoch) in non_checkpoint_epochs
290 .iter()
291 .chain([checkpoint_epoch])
292 .enumerate()
293 {
294 assert!(*epoch > prev_epoch);
295 pending_non_checkpoint_barriers.push(prev_epoch);
296 ret.push(BarrierInfo {
297 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
298 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
299 kind: if i == 0 {
300 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
301 } else {
302 BarrierKind::Barrier
303 },
304 });
305 prev_epoch = *epoch;
306 }
307 }
308 ret.push(BarrierInfo {
309 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
310 curr_epoch: TracedEpoch::new(Epoch(upstream_barrier_info.curr_epoch())),
311 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
312 });
313 Ok(ret)
314 }
315
316 fn recover_consuming_snapshot(
317 job_id: JobId,
318 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
319 snapshot_epoch: u64,
320 committed_epoch: u64,
321 upstream_barrier_info: &BarrierInfo,
322 info: CreatingJobInfo,
323 backfill_order_state: BackfillOrderState,
324 version_stat: &HummockVersionStats,
325 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
326 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
327 let mut pending_non_checkpoint_barriers = vec![];
328 let create_mview_tracker = CreateMviewProgressTracker::recover(
329 job_id,
330 &info.fragment_infos,
331 backfill_order_state,
332 version_stat,
333 );
334 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
335 &mut prev_epoch_fake_physical_time,
336 &mut pending_non_checkpoint_barriers,
337 PbBarrierKind::Initial,
338 );
339
340 Ok((
341 CreatingStreamingJobStatus::ConsumingSnapshot {
342 prev_epoch_fake_physical_time,
343 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
344 &info.snapshot_backfill_upstream_tables,
345 upstream_table_log_epochs,
346 snapshot_epoch,
347 upstream_barrier_info,
348 )?,
349 version_stats: version_stat.clone(),
350 create_mview_tracker,
351 snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
352 &info.fragment_infos,
353 )
354 .collect(),
355 info,
356 snapshot_epoch,
357 pending_non_checkpoint_barriers,
358 },
359 barrier_info,
360 ))
361 }
362
363 fn recover_consuming_log_store(
364 job_id: JobId,
365 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
366 committed_epoch: u64,
367 upstream_barrier_info: &BarrierInfo,
368 info: CreatingJobInfo,
369 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
370 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
371 &info.snapshot_backfill_upstream_tables,
372 upstream_table_log_epochs,
373 committed_epoch,
374 upstream_barrier_info,
375 )?;
376 let mut first_barrier = barriers_to_inject.remove(0);
377 assert!(first_barrier.kind.is_checkpoint());
378 first_barrier.kind = BarrierKind::Initial;
379
380 Ok((
381 CreatingStreamingJobStatus::ConsumingLogStore {
382 tracking_job: TrackingJob::recovered(job_id, &info.fragment_infos),
383 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
384 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&info.fragment_infos),
385 barriers_to_inject
386 .last()
387 .map(|info| info.prev_epoch() - committed_epoch)
388 .unwrap_or(0),
389 ),
390 barriers_to_inject: Some(barriers_to_inject),
391 info,
392 },
393 first_barrier,
394 ))
395 }
396
397 #[expect(clippy::too_many_arguments)]
398 pub(crate) fn recover(
399 database_id: DatabaseId,
400 job_id: JobId,
401 snapshot_backfill_upstream_tables: HashSet<TableId>,
402 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
403 snapshot_epoch: u64,
404 committed_epoch: u64,
405 upstream_barrier_info: &BarrierInfo,
406 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
407 backfill_order: ExtendedFragmentBackfillOrder,
408 fragment_relations: &FragmentDownstreamRelation,
409 version_stat: &HummockVersionStats,
410 new_actors: StreamJobActorsToCreate,
411 initial_mutation: Mutation,
412 control_stream_manager: &mut ControlStreamManager,
413 ) -> MetaResult<Self> {
414 info!(
415 %job_id,
416 "recovered creating snapshot backfill job"
417 );
418 let mut barrier_control =
419 CreatingStreamingJobBarrierControl::new(job_id, snapshot_epoch, Some(committed_epoch));
420
421 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
422 let state_table_ids =
423 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
424
425 let mut upstream_fragment_downstreams: FragmentDownstreamRelation = Default::default();
426 for (upstream_fragment_id, downstreams) in fragment_relations {
427 if fragment_infos.contains_key(upstream_fragment_id) {
428 continue;
429 }
430 for downstream in downstreams {
431 if fragment_infos.contains_key(&downstream.downstream_fragment_id) {
432 upstream_fragment_downstreams
433 .entry(*upstream_fragment_id)
434 .or_default()
435 .push(downstream.clone());
436 }
437 }
438 }
439 let downstreams = fragment_infos
440 .keys()
441 .filter_map(|fragment_id| {
442 fragment_relations
443 .get(fragment_id)
444 .map(|relation| (*fragment_id, relation.clone()))
445 })
446 .collect();
447
448 let info = CreatingJobInfo {
449 fragment_infos,
450 upstream_fragment_downstreams,
451 downstreams,
452 snapshot_backfill_upstream_tables: snapshot_backfill_upstream_tables.clone(),
453 stream_actors: new_actors
454 .values()
455 .flat_map(|fragments| {
456 fragments.values().flat_map(|(_, actors, _)| {
457 actors
458 .iter()
459 .map(|(actor, _, _)| (actor.actor_id, actor.clone()))
460 })
461 })
462 .collect(),
463 };
464
465 let (status, first_barrier_info) = if committed_epoch < snapshot_epoch {
466 let locality_fragment_state_table_mapping =
467 build_locality_fragment_state_table_mapping(&info.fragment_infos);
468 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
469 &backfill_order,
470 &info.fragment_infos,
471 locality_fragment_state_table_mapping,
472 );
473 Self::recover_consuming_snapshot(
474 job_id,
475 upstream_table_log_epochs,
476 snapshot_epoch,
477 committed_epoch,
478 upstream_barrier_info,
479 info,
480 backfill_order_state,
481 version_stat,
482 )?
483 } else {
484 Self::recover_consuming_log_store(
485 job_id,
486 upstream_table_log_epochs,
487 committed_epoch,
488 upstream_barrier_info,
489 info,
490 )?
491 };
492 control_stream_manager.add_partial_graph(database_id, Some(job_id));
493
494 Self::inject_barrier(
495 database_id,
496 job_id,
497 control_stream_manager,
498 &mut barrier_control,
499 &node_actors,
500 Some(&state_table_ids),
501 first_barrier_info,
502 Some(new_actors),
503 Some(initial_mutation),
504 vec![], None,
506 )?;
507 Ok(Self {
508 database_id,
509 job_id,
510 snapshot_backfill_upstream_tables,
511 snapshot_epoch,
512 node_actors,
513 state_table_ids,
514 barrier_control,
515 status,
516 upstream_lag: GLOBAL_META_METRICS
517 .snapshot_backfill_lag
518 .with_guarded_label_values(&[&format!("{}", job_id)]),
519 })
520 }
521
522 pub(crate) fn is_empty(&self) -> bool {
523 self.barrier_control.is_empty()
524 }
525
526 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
527 self.barrier_control.is_valid_after_worker_err(worker_id)
528 && self
529 .status
530 .fragment_infos()
531 .map(|fragment_infos| {
532 !InflightFragmentInfo::contains_worker(fragment_infos.values(), worker_id)
533 })
534 .unwrap_or(true)
535 }
536
537 pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
538 let progress = match &self.status {
539 CreatingStreamingJobStatus::ConsumingSnapshot {
540 create_mview_tracker,
541 ..
542 } => {
543 if create_mview_tracker.is_finished() {
544 "Snapshot finished".to_owned()
545 } else {
546 let progress = create_mview_tracker.gen_backfill_progress();
547 format!("Snapshot [{}]", progress)
548 }
549 }
550 CreatingStreamingJobStatus::ConsumingLogStore {
551 log_store_progress_tracker,
552 ..
553 } => {
554 format!(
555 "LogStore [{}]",
556 log_store_progress_tracker.gen_backfill_progress()
557 )
558 }
559 CreatingStreamingJobStatus::Finishing(..) => {
560 format!(
561 "Finishing [epoch count: {}]",
562 self.barrier_control.inflight_barrier_count()
563 )
564 }
565 CreatingStreamingJobStatus::Resetting(_, _) => "Resetting".to_owned(),
566 CreatingStreamingJobStatus::PlaceHolder => {
567 unreachable!()
568 }
569 };
570 BackfillProgress {
571 progress,
572 backfill_type: PbBackfillType::SnapshotBackfill,
573 }
574 }
575
576 pub(super) fn pinned_upstream_log_epoch(&self) -> u64 {
577 max(
578 self.barrier_control.max_committed_epoch().unwrap_or(0),
579 self.snapshot_epoch,
580 )
581 }
582
583 #[expect(clippy::too_many_arguments)]
584 fn inject_barrier(
585 database_id: DatabaseId,
586 job_id: JobId,
587 control_stream_manager: &mut ControlStreamManager,
588 barrier_control: &mut CreatingStreamingJobBarrierControl,
589 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
590 state_table_ids: Option<&HashSet<TableId>>,
591 barrier_info: BarrierInfo,
592 new_actors: Option<StreamJobActorsToCreate>,
593 mutation: Option<Mutation>,
594 mut notifiers: Vec<Notifier>,
595 first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
596 ) -> MetaResult<()> {
597 let (state_table_ids, nodes_to_sync_table) = if let Some(state_table_ids) = state_table_ids
598 {
599 (Some(state_table_ids), Some(node_actors.keys().copied()))
600 } else {
601 (None, None)
602 };
603 let node_to_collect = control_stream_manager.inject_barrier(
604 database_id,
605 Some(job_id),
606 mutation,
607 &barrier_info,
608 node_actors,
609 state_table_ids.into_iter().flatten().copied(),
610 nodes_to_sync_table.into_iter().flatten(),
611 new_actors,
612 )?;
613 notifiers.iter_mut().for_each(|n| n.notify_started());
614 barrier_control.enqueue_epoch(
615 barrier_info.prev_epoch(),
616 node_to_collect,
617 barrier_info.kind.clone(),
618 notifiers,
619 first_create_info,
620 );
621 Ok(())
622 }
623
624 pub(super) fn start_consume_upstream(
625 &mut self,
626 control_stream_manager: &mut ControlStreamManager,
627 barrier_info: &BarrierInfo,
628 ) -> MetaResult<CreatingJobInfo> {
629 info!(
630 job_id = %self.job_id,
631 prev_epoch = barrier_info.prev_epoch(),
632 "start consuming upstream"
633 );
634 let info = self.status.start_consume_upstream(barrier_info);
635 Self::inject_barrier(
636 self.database_id,
637 self.job_id,
638 control_stream_manager,
639 &mut self.barrier_control,
640 &self.node_actors,
641 None,
642 barrier_info.clone(),
643 None,
644 Some(Mutation::Stop(StopMutation {
645 actors: info
647 .fragment_infos
648 .values()
649 .flat_map(|info| info.actors.keys().copied())
650 .collect(),
651 dropped_sink_fragments: vec![], })),
653 vec![], None,
655 )?;
656 Ok(info)
657 }
658
659 pub(super) fn on_new_upstream_barrier(
660 &mut self,
661 control_stream_manager: &mut ControlStreamManager,
662 barrier_info: &BarrierInfo,
663 mutation: Option<(Mutation, Vec<Notifier>)>,
664 ) -> MetaResult<()> {
665 let progress_epoch =
666 if let Some(max_committed_epoch) = self.barrier_control.max_committed_epoch() {
667 max(max_committed_epoch, self.snapshot_epoch)
668 } else {
669 self.snapshot_epoch
670 };
671 self.upstream_lag.set(
672 barrier_info
673 .prev_epoch
674 .value()
675 .0
676 .saturating_sub(progress_epoch) as _,
677 );
678 let (mut mutation, mut notifiers) = match mutation {
679 Some((mutation, notifiers)) => (Some(mutation), notifiers),
680 None => (None, vec![]),
681 };
682 {
683 for (barrier_to_inject, mutation) in self
684 .status
685 .on_new_upstream_epoch(barrier_info, mutation.take())
686 {
687 Self::inject_barrier(
688 self.database_id,
689 self.job_id,
690 control_stream_manager,
691 &mut self.barrier_control,
692 &self.node_actors,
693 Some(&self.state_table_ids),
694 barrier_to_inject,
695 None,
696 mutation,
697 take(&mut notifiers),
698 None,
699 )?;
700 }
701 assert!(mutation.is_none(), "must have consumed mutation");
702 assert!(notifiers.is_empty(), "must consumed notifiers");
703 }
704 Ok(())
705 }
706
707 pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
708 self.status.update_progress(&resp.create_mview_progress);
709 self.barrier_control.collect(resp);
710 self.should_merge_to_upstream()
711 }
712
713 pub(super) fn should_merge_to_upstream(&self) -> bool {
714 if let CreatingStreamingJobStatus::ConsumingLogStore {
715 log_store_progress_tracker,
716 barriers_to_inject,
717 ..
718 } = &self.status
719 && barriers_to_inject.is_none()
720 && log_store_progress_tracker.is_finished()
721 {
722 true
723 } else {
724 false
725 }
726 }
727}
728
729pub(super) enum CompleteJobType {
730 First(CreateSnapshotBackfillJobCommandInfo),
732 Normal,
733 Finished,
735}
736
737impl CreatingStreamingJobControl {
738 pub(super) fn start_completing(
739 &mut self,
740 min_upstream_inflight_epoch: Option<u64>,
741 upstream_committed_epoch: u64,
742 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
743 if upstream_committed_epoch < self.snapshot_epoch {
745 return None;
746 }
747 let (finished_at_epoch, epoch_end_bound) = match &self.status {
748 CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
749 let epoch_end_bound = min_upstream_inflight_epoch
750 .map(|upstream_epoch| {
751 if upstream_epoch < *finish_at_epoch {
752 Excluded(upstream_epoch)
753 } else {
754 Unbounded
755 }
756 })
757 .unwrap_or(Unbounded);
758 (Some(*finish_at_epoch), epoch_end_bound)
759 }
760 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
761 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
762 None,
763 min_upstream_inflight_epoch
764 .map(Excluded)
765 .unwrap_or(Unbounded),
766 ),
767 CreatingStreamingJobStatus::Resetting(_, _) => {
768 return None;
769 }
770 CreatingStreamingJobStatus::PlaceHolder => {
771 unreachable!()
772 }
773 };
774 self.barrier_control.start_completing(epoch_end_bound).map(
775 |(epoch, resps, create_job_info)| {
776 let status = if let Some(finish_at_epoch) = finished_at_epoch {
777 assert!(create_job_info.is_none());
778 if epoch == finish_at_epoch {
779 self.barrier_control.ack_completed(epoch);
780 assert!(self.barrier_control.is_empty());
781 CompleteJobType::Finished
782 } else {
783 CompleteJobType::Normal
784 }
785 } else if let Some(info) = create_job_info {
786 CompleteJobType::First(info)
787 } else {
788 CompleteJobType::Normal
789 };
790 (epoch, resps, status)
791 },
792 )
793 }
794
795 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
796 self.barrier_control.ack_completed(completed_epoch);
797 }
798
799 pub fn is_consuming(&self) -> bool {
800 match &self.status {
801 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
802 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
803 CreatingStreamingJobStatus::Finishing(..)
804 | CreatingStreamingJobStatus::Resetting(_, _) => false,
805 CreatingStreamingJobStatus::PlaceHolder => {
806 unreachable!()
807 }
808 }
809 }
810
811 pub fn state_table_ids(&self) -> &HashSet<TableId> {
812 &self.state_table_ids
813 }
814
815 pub fn fragment_infos_with_job_id(
816 &self,
817 ) -> impl Iterator<Item = (&InflightFragmentInfo, JobId)> + '_ {
818 self.status
819 .fragment_infos()
820 .into_iter()
821 .flat_map(|fragments| fragments.values().map(|fragment| (fragment, self.job_id)))
822 }
823
824 pub fn into_tracking_job(self) -> TrackingJob {
825 assert!(self.barrier_control.is_empty());
826 match self.status {
827 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
828 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
829 | CreatingStreamingJobStatus::Resetting(_, _)
830 | CreatingStreamingJobStatus::PlaceHolder => {
831 unreachable!("expect finish")
832 }
833 CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
834 }
835 }
836
837 pub(super) fn on_reset_partial_graph_resp(
838 &mut self,
839 worker_id: WorkerId,
840 resp: ResetPartialGraphResponse,
841 ) -> bool {
842 match &mut self.status {
843 CreatingStreamingJobStatus::Resetting(collector, notifiers) => {
844 collector.collect(worker_id, resp);
845 if collector.remaining_workers.is_empty() {
846 for notifier in notifiers.drain(..) {
847 notifier.notify_collected();
848 }
849 true
850 } else {
851 false
852 }
853 }
854 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
855 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
856 | CreatingStreamingJobStatus::Finishing(_, _) => {
857 panic!(
858 "should be resetting when receiving reset partial graph resp, but at {:?}",
859 self.status
860 )
861 }
862 CreatingStreamingJobStatus::PlaceHolder => {
863 unreachable!()
864 }
865 }
866 }
867
868 pub(super) fn drop(
872 &mut self,
873 notifiers: &mut Vec<Notifier>,
874 control_stream_manager: &mut ControlStreamManager,
875 ) -> bool {
876 match &mut self.status {
877 CreatingStreamingJobStatus::Resetting(_, existing_notifiers) => {
878 for notifier in &mut *notifiers {
879 notifier.notify_started();
880 }
881 existing_notifiers.append(notifiers);
882 true
883 }
884 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
885 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
886 for notifier in &mut *notifiers {
887 notifier.notify_started();
888 }
889 let remaining_workers =
890 control_stream_manager.reset_partial_graphs(vec![to_partial_graph_id(
891 self.database_id,
892 Some(self.job_id),
893 )]);
894 let collector = ResetPartialGraphCollector {
895 remaining_workers,
896 reset_resps: Default::default(),
897 };
898 self.status = CreatingStreamingJobStatus::Resetting(collector, take(notifiers));
899 true
900 }
901 CreatingStreamingJobStatus::Finishing(_, _) => false,
902 CreatingStreamingJobStatus::PlaceHolder => {
903 unreachable!()
904 }
905 }
906 }
907
908 pub(super) fn reset(self) -> Option<ResetPartialGraphCollector> {
909 match self.status {
910 CreatingStreamingJobStatus::Resetting(collector, notifiers) => {
911 for notifier in notifiers {
912 notifier.notify_collected();
913 }
914 Some(collector)
915 }
916 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
917 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
918 | CreatingStreamingJobStatus::Finishing(_, _) => None,
919 CreatingStreamingJobStatus::PlaceHolder => {
920 unreachable!()
921 }
922 }
923 }
924}