1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::sync::atomic::AtomicU32;
20
21use risingwave_common::bail;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::TableId;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
29use risingwave_pb::common::WorkerNode;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
32use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
33use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
34use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
35use risingwave_pb::stream_plan::{
36 AddMutation, PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
37 PbUpstreamSinkInfo, ThrottleMutation,
38};
39use tracing::warn;
40
41use crate::MetaResult;
42use crate::barrier::cdc_progress::CdcTableBackfillTracker;
43use crate::barrier::checkpoint::{
44 BatchRefreshJobCheckpointControl, BatchRefreshLogicalFragments, CreatingStreamingJobControl,
45 DatabaseCheckpointControl, IndependentCheckpointJobControl,
46};
47use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
48use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
49use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
50use crate::barrier::info::{
51 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
52 SubscriberType,
53};
54use crate::barrier::notifier::Notifier;
55use crate::barrier::partial_graph::{PartialGraphBarrierInfo, PartialGraphManager};
56use crate::barrier::rpc::to_partial_graph_id;
57use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
58use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
59use crate::controller::scale::{
60 ComponentFragmentAligner, EnsembleActorTemplate, LoadedFragment, NoShuffleEnsemble,
61 build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
62};
63use crate::model::{
64 ActorId, ActorNewNoShuffle, FragmentDownstreamRelation, FragmentId, StreamActor, StreamContext,
65 StreamJobActorsToCreate, StreamJobFragmentsToCreate,
66};
67use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
68use crate::stream::{
69 GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
70 fill_snapshot_backfill_epoch,
71};
72
73pub(in crate::barrier) struct BarrierWorkerState {
75 in_flight_prev_epoch: TracedEpoch,
80
81 pending_non_checkpoint_barriers: Vec<u64>,
83
84 is_paused: bool,
86}
87
88impl BarrierWorkerState {
89 pub(super) fn new() -> Self {
90 Self {
91 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
92 pending_non_checkpoint_barriers: vec![],
93 is_paused: false,
94 }
95 }
96
97 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
98 Self {
99 in_flight_prev_epoch,
100 pending_non_checkpoint_barriers: vec![],
101 is_paused,
102 }
103 }
104
105 pub fn is_paused(&self) -> bool {
106 self.is_paused
107 }
108
109 fn set_is_paused(&mut self, is_paused: bool) {
110 if self.is_paused != is_paused {
111 tracing::info!(
112 currently_paused = self.is_paused,
113 newly_paused = is_paused,
114 "update paused state"
115 );
116 self.is_paused = is_paused;
117 }
118 }
119
120 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
121 &self.in_flight_prev_epoch
122 }
123
124 pub fn next_barrier_info(
126 &mut self,
127 is_checkpoint: bool,
128 curr_epoch: TracedEpoch,
129 ) -> BarrierInfo {
130 assert!(
131 self.in_flight_prev_epoch.value() < curr_epoch.value(),
132 "curr epoch regress. {} > {}",
133 self.in_flight_prev_epoch.value(),
134 curr_epoch.value()
135 );
136 let prev_epoch = self.in_flight_prev_epoch.clone();
137 self.in_flight_prev_epoch = curr_epoch.clone();
138 self.pending_non_checkpoint_barriers
139 .push(prev_epoch.value().0);
140 let kind = if is_checkpoint {
141 let epochs = take(&mut self.pending_non_checkpoint_barriers);
142 BarrierKind::Checkpoint(epochs)
143 } else {
144 BarrierKind::Barrier
145 };
146 BarrierInfo {
147 prev_epoch,
148 curr_epoch,
149 kind,
150 }
151 }
152}
153
154pub(super) struct ApplyCommandInfo {
155 pub jobs_to_wait: HashSet<JobId>,
156}
157
158type ApplyCommandResult = (
161 Option<Mutation>,
162 HashSet<TableId>,
163 Option<StreamJobActorsToCreate>,
164 HashMap<WorkerId, HashSet<ActorId>>,
165 PostCollectCommand,
166);
167
168pub(crate) struct RenderResult {
170 pub stream_actors: HashMap<FragmentId, Vec<StreamActor>>,
172 pub actor_location: HashMap<ActorId, WorkerId>,
174}
175
176pub(crate) fn resolve_no_shuffle_ensembles(
185 fragments: &StreamJobFragmentsToCreate,
186 upstream_fragment_downstreams: &FragmentDownstreamRelation,
187) -> MetaResult<Vec<NoShuffleEnsemble>> {
188 let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
190
191 for (upstream_fid, relations) in &fragments.downstreams {
193 for rel in relations {
194 if rel.dispatcher_type == DispatcherType::NoShuffle {
195 new_no_shuffle
196 .entry(*upstream_fid)
197 .or_default()
198 .insert(rel.downstream_fragment_id);
199 }
200 }
201 }
202
203 for (upstream_fid, relations) in upstream_fragment_downstreams {
205 for rel in relations {
206 if rel.dispatcher_type == DispatcherType::NoShuffle {
207 new_no_shuffle
208 .entry(*upstream_fid)
209 .or_default()
210 .insert(rel.downstream_fragment_id);
211 }
212 }
213 }
214
215 let mut ensembles = if new_no_shuffle.is_empty() {
216 Vec::new()
217 } else {
218 let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
220 .iter()
221 .flat_map(|(upstream_fid, downstream_fids)| {
222 downstream_fids
223 .iter()
224 .map(move |downstream_fid| (*upstream_fid, *downstream_fid))
225 })
226 .collect();
227
228 let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
229 .iter()
230 .flat_map(|(u, d)| [*u, *d])
231 .collect::<HashSet<_>>()
232 .into_iter()
233 .collect();
234
235 let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
236 find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
237 };
238
239 let covered: HashSet<FragmentId> = ensembles
241 .iter()
242 .flat_map(|e| e.component_fragments())
243 .collect();
244 for fragment_id in fragments.inner.fragments.keys() {
245 if !covered.contains(fragment_id) {
246 ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
247 }
248 }
249
250 Ok(ensembles)
251}
252
253pub(super) fn render_actors(
264 fragments: &StreamJobFragmentsToCreate,
265 database_info: &InflightDatabaseInfo,
266 definition: &str,
267 ctx: &StreamContext,
268 streaming_job_model: &streaming_job::Model,
269 actor_id_counter: &AtomicU32,
270 worker_map: &HashMap<WorkerId, WorkerNode>,
271 ensembles: &[NoShuffleEnsemble],
272 database_resource_group: &str,
273) -> MetaResult<RenderResult> {
274 let mut actor_assignments: HashMap<FragmentId, HashMap<ActorId, (WorkerId, Option<Bitmap>)>> =
277 HashMap::new();
278
279 for ensemble in ensembles {
280 let existing_fragment_ids: Vec<FragmentId> = ensemble
285 .component_fragments()
286 .filter(|fragment_id| !fragments.inner.fragments.contains_key(fragment_id))
287 .collect();
288
289 let actor_template = if let Some(&first_existing) = existing_fragment_ids.first() {
290 let template = EnsembleActorTemplate::from_existing_inflight_fragment(
291 database_info.fragment(first_existing),
292 );
293
294 for &other_fragment_id in &existing_fragment_ids[1..] {
297 let other = EnsembleActorTemplate::from_existing_inflight_fragment(
298 database_info.fragment(other_fragment_id),
299 );
300 template.assert_aligned_with(&other, first_existing, other_fragment_id);
301 }
302
303 template
304 } else {
305 let first_component = ensemble
307 .component_fragments()
308 .next()
309 .expect("ensemble must have at least one component");
310 let fragment = &fragments.inner.fragments[&first_component];
311 let distribution_type: DistributionType = fragment.distribution_type.into();
312 let vnode_count = fragment.vnode_count();
313
314 for fragment_id in ensemble.component_fragments() {
316 let f = &fragments.inner.fragments[&fragment_id];
317 assert_eq!(
318 vnode_count,
319 f.vnode_count(),
320 "component fragments {} and {} in the same no-shuffle ensemble have \
321 different vnode counts: {} vs {}",
322 first_component,
323 fragment_id,
324 vnode_count,
325 f.vnode_count(),
326 );
327 }
328
329 EnsembleActorTemplate::render_new(
330 streaming_job_model,
331 worker_map,
332 None,
333 database_resource_group.to_owned(),
334 distribution_type,
335 vnode_count,
336 )?
337 };
338
339 for fragment_id in ensemble.component_fragments() {
341 if !fragments.inner.fragments.contains_key(&fragment_id) {
342 continue; }
344 let fragment = &fragments.inner.fragments[&fragment_id];
345 let distribution_type: DistributionType = fragment.distribution_type.into();
346 let aligner =
347 ComponentFragmentAligner::new_persistent(&actor_template, actor_id_counter);
348 let assignments = aligner.align_component_actor(distribution_type);
349 actor_assignments.insert(fragment_id, assignments);
350 }
351 }
352
353 let mut result_stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
355 let mut result_actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
356
357 for (fragment_id, assignments) in &actor_assignments {
358 let mut actors = Vec::with_capacity(assignments.len());
359 for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
360 result_actor_location.insert(actor_id, *worker_id);
361 actors.push(StreamActor {
362 actor_id,
363 fragment_id: *fragment_id,
364 vnode_bitmap: vnode_bitmap.clone(),
365 mview_definition: definition.to_owned(),
366 expr_context: Some(ctx.to_expr_context()),
367 config_override: ctx.config_override.clone(),
368 });
369 }
370 result_stream_actors.insert(*fragment_id, actors);
371 }
372
373 Ok(RenderResult {
374 stream_actors: result_stream_actors,
375 actor_location: result_actor_location,
376 })
377}
378impl DatabaseCheckpointControl {
379 fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
381 let table_ids_to_commit = self.database_info.existing_table_ids().collect();
382 let node_actors =
383 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
384 (table_ids_to_commit, node_actors)
385 }
386
387 fn apply_simple_command(
391 &self,
392 mutation: Option<Mutation>,
393 command_name: &'static str,
394 ) -> ApplyCommandResult {
395 let (table_ids, node_actors) = self.collect_base_info();
396 (
397 mutation,
398 table_ids,
399 None,
400 node_actors,
401 PostCollectCommand::Command(command_name.to_owned()),
402 )
403 }
404
405 pub(super) fn apply_command(
408 &mut self,
409 command: Option<Command>,
410 notifiers: &mut Vec<Notifier>,
411 barrier_info: BarrierInfo,
412 partial_graph_manager: &mut PartialGraphManager,
413 hummock_version_stats: &HummockVersionStats,
414 worker_nodes: &HashMap<WorkerId, WorkerNode>,
415 ) -> MetaResult<ApplyCommandInfo> {
416 debug_assert!(
417 !matches!(
418 command,
419 Some(Command::RescheduleIntent {
420 reschedule_plan: None,
421 ..
422 })
423 ),
424 "reschedule intent must be resolved before apply"
425 );
426 if matches!(
427 command,
428 Some(Command::RescheduleIntent {
429 reschedule_plan: None,
430 ..
431 })
432 ) {
433 bail!("reschedule intent must be resolved before apply");
434 }
435
436 fn resolve_source_splits(
441 info: &CreateStreamingJobCommandInfo,
442 render_result: &RenderResult,
443 actor_no_shuffle: &ActorNewNoShuffle,
444 database_info: &InflightDatabaseInfo,
445 ) -> MetaResult<SplitAssignment> {
446 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
447 .stream_actors
448 .iter()
449 .map(|(fragment_id, actors)| {
450 (
451 *fragment_id,
452 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
453 )
454 })
455 .collect();
456 let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
457 &info.stream_job_fragments,
458 &info.init_split_assignment,
459 &fragment_actor_ids,
460 )?;
461 resolved.extend(SourceManager::resolve_backfill_splits(
462 &info.stream_job_fragments,
463 actor_no_shuffle,
464 |fragment_id, actor_id| {
465 database_info
466 .fragment(fragment_id)
467 .actors
468 .get(&actor_id)
469 .map(|info| info.splits.clone())
470 },
471 )?);
472 Ok(resolved)
473 }
474
475 let mut throttle_for_creating_jobs: Option<(
477 HashSet<JobId>,
478 HashMap<FragmentId, ThrottleConfig>,
479 )> = None;
480
481 let (
485 mutation,
486 mut table_ids_to_commit,
487 mut actors_to_create,
488 mut node_actors,
489 post_collect_command,
490 ) = match command {
491 None => self.apply_simple_command(None, "barrier"),
492 Some(Command::CreateStreamingJob {
493 mut info,
494 job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
495 cross_db_snapshot_backfill_info,
496 }) => {
497 let ensembles = resolve_no_shuffle_ensembles(
498 &info.stream_job_fragments,
499 &info.upstream_fragment_downstreams,
500 )?;
501 let actors = render_actors(
502 &info.stream_job_fragments,
503 &self.database_info,
504 &info.definition,
505 &info.stream_job_fragments.inner.ctx,
506 &info.streaming_job_model,
507 partial_graph_manager
508 .control_stream_manager()
509 .env
510 .actor_id_generator(),
511 worker_nodes,
512 &ensembles,
513 &info.database_resource_group,
514 )?;
515 {
516 assert!(!self.state.is_paused());
517 let snapshot_epoch = barrier_info.prev_epoch();
518 for snapshot_backfill_epoch in snapshot_backfill_info
520 .upstream_mv_table_id_to_backfill_epoch
521 .values_mut()
522 {
523 assert_eq!(
524 snapshot_backfill_epoch.replace(snapshot_epoch),
525 None,
526 "must not set previously"
527 );
528 }
529 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
530 fill_snapshot_backfill_epoch(
531 &mut fragment.nodes,
532 Some(&snapshot_backfill_info),
533 &cross_db_snapshot_backfill_info,
534 )?;
535 }
536 let job_id = info.stream_job_fragments.stream_job_id();
537 let snapshot_backfill_upstream_tables = snapshot_backfill_info
538 .upstream_mv_table_id_to_backfill_epoch
539 .keys()
540 .cloned()
541 .collect();
542
543 let mut edges = self.database_info.build_edge(
545 Some((&info, true)),
546 None,
547 None,
548 partial_graph_manager.control_stream_manager(),
549 &actors.stream_actors,
550 &actors.actor_location,
551 );
552 let resolved_split_assignment = resolve_source_splits(
554 &info,
555 &actors,
556 edges.actor_new_no_shuffle(),
557 &self.database_info,
558 )?;
559
560 let Entry::Vacant(entry) =
561 self.independent_checkpoint_job_controls.entry(job_id)
562 else {
563 panic!("duplicated creating snapshot backfill job {job_id}");
564 };
565
566 let job = CreatingStreamingJobControl::new(
567 entry,
568 CreateSnapshotBackfillJobCommandInfo {
569 info: info.clone(),
570 snapshot_backfill_info: snapshot_backfill_info.clone(),
571 cross_db_snapshot_backfill_info,
572 resolved_split_assignment: resolved_split_assignment.clone(),
573 refresh_interval_sec: None,
574 },
575 take(notifiers),
576 snapshot_backfill_upstream_tables,
577 snapshot_epoch,
578 hummock_version_stats,
579 partial_graph_manager,
580 &mut edges,
581 &resolved_split_assignment,
582 &actors,
583 )?;
584
585 if let Some(fragment_infos) = job.fragment_infos() {
586 self.database_info.shared_actor_infos.upsert(
587 self.database_id,
588 fragment_infos.values().map(|f| (f, job_id)),
589 );
590 }
591
592 for upstream_mv_table_id in snapshot_backfill_info
593 .upstream_mv_table_id_to_backfill_epoch
594 .keys()
595 {
596 self.database_info.register_subscriber(
597 upstream_mv_table_id.as_job_id(),
598 info.streaming_job.id().as_subscriber_id(),
599 SubscriberType::SnapshotBackfill,
600 );
601 }
602
603 let mutation = Command::create_streaming_job_to_mutation(
604 &info,
605 &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
606 self.state.is_paused(),
607 &mut edges,
608 partial_graph_manager.control_stream_manager(),
609 None,
610 &resolved_split_assignment,
611 &actors.stream_actors,
612 &actors.actor_location,
613 )?;
614
615 let (table_ids, node_actors) = self.collect_base_info();
616 (
617 Some(mutation),
618 table_ids,
619 None,
620 node_actors,
621 PostCollectCommand::barrier(),
622 )
623 }
624 }
625 Some(Command::CreateStreamingJob {
626 mut info,
627 job_type: CreateStreamingJobType::BatchRefresh(mut batch_refresh_info),
628 cross_db_snapshot_backfill_info,
629 }) => {
630 {
631 if self.state.is_paused() {
632 bail!("cannot create batch refresh job while database barrier is paused");
633 }
634 let snapshot_epoch = barrier_info.prev_epoch();
635 let job_id = info.stream_job_fragments.stream_job_id();
636 let database_id = info.streaming_job.database_id();
637
638 let snapshot_backfill_info = &mut batch_refresh_info.snapshot_backfill_info;
640 for snapshot_backfill_epoch in snapshot_backfill_info
641 .upstream_mv_table_id_to_backfill_epoch
642 .values_mut()
643 {
644 assert_eq!(
645 snapshot_backfill_epoch.replace(snapshot_epoch),
646 None,
647 "must not set previously"
648 );
649 }
650 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
651 fill_snapshot_backfill_epoch(
652 &mut fragment.nodes,
653 Some(snapshot_backfill_info),
654 &cross_db_snapshot_backfill_info,
655 )?;
656 }
657 let snapshot_backfill_upstream_tables: HashSet<TableId> =
658 snapshot_backfill_info
659 .upstream_mv_table_id_to_backfill_epoch
660 .keys()
661 .cloned()
662 .collect();
663
664 let logical = BatchRefreshLogicalFragments {
666 fragments: info
667 .stream_job_fragments
668 .inner
669 .fragments
670 .iter()
671 .map(|(&fid, fragment)| {
672 (
673 fid,
674 LoadedFragment {
675 fragment_id: fid,
676 job_id,
677 fragment_type_mask: fragment.fragment_type_mask,
678 distribution_type: fragment.distribution_type.into(),
679 vnode_count: fragment.vnode_count(),
680 nodes: fragment.nodes.clone(),
681 state_table_ids: fragment
682 .state_table_ids
683 .iter()
684 .cloned()
685 .collect(),
686 parallelism: None,
687 },
688 )
689 })
690 .collect(),
691 downstreams: info.stream_job_fragments.downstreams.clone(),
692 };
693
694 assert!(
698 !self
699 .independent_checkpoint_job_controls
700 .contains_key(&job_id),
701 "duplicated creating batch refresh job {job_id}"
702 );
703
704 let snapshot_backfill_info_clone =
705 batch_refresh_info.snapshot_backfill_info.clone();
706 let refresh_interval_sec = batch_refresh_info.refresh_interval_sec;
707
708 let subscriber_id =
712 info.stream_job_fragments.stream_job_id().as_subscriber_id();
713 let mutation = Mutation::Add(AddMutation {
714 actor_dispatchers: Default::default(),
715 added_actors: Default::default(),
716 actor_splits: Default::default(),
717 pause: false,
718 subscriptions_to_add: snapshot_backfill_info_clone
719 .upstream_mv_table_id_to_backfill_epoch
720 .keys()
721 .map(|table_id| PbSubscriptionUpstreamInfo {
722 subscriber_id,
723 upstream_mv_table_id: *table_id,
724 })
725 .collect(),
726 backfill_nodes_to_pause: Default::default(),
727 actor_cdc_table_snapshot_splits: None,
728 new_upstream_sinks: Default::default(),
729 });
730
731 let job = BatchRefreshJobCheckpointControl::new(
732 database_id,
733 job_id,
734 CreateSnapshotBackfillJobCommandInfo {
735 info: info.clone(),
736 snapshot_backfill_info: snapshot_backfill_info_clone.clone(),
737 cross_db_snapshot_backfill_info,
738 resolved_split_assignment: Default::default(),
739 refresh_interval_sec: Some(refresh_interval_sec),
740 },
741 take(notifiers),
742 snapshot_backfill_upstream_tables,
743 snapshot_epoch,
744 hummock_version_stats,
745 partial_graph_manager,
746 &logical,
747 worker_nodes,
748 )?;
749
750 if let Some(fragment_infos) = job.fragment_infos() {
751 self.database_info.shared_actor_infos.upsert(
752 self.database_id,
753 fragment_infos.values().map(|f| (f, job_id)),
754 );
755 }
756
757 self.independent_checkpoint_job_controls
758 .insert(job_id, IndependentCheckpointJobControl::BatchRefresh(job));
759
760 for upstream_mv_table_id in snapshot_backfill_info_clone
762 .upstream_mv_table_id_to_backfill_epoch
763 .keys()
764 {
765 self.database_info.register_subscriber(
766 upstream_mv_table_id.as_job_id(),
767 info.streaming_job.id().as_subscriber_id(),
768 SubscriberType::SnapshotBackfill,
769 );
770 }
771
772 let (table_ids, node_actors) = self.collect_base_info();
773 (
774 Some(mutation),
775 table_ids,
776 None,
777 node_actors,
778 PostCollectCommand::barrier(),
779 )
780 }
781 }
782 Some(Command::CreateStreamingJob {
783 mut info,
784 job_type,
785 cross_db_snapshot_backfill_info,
786 }) => {
787 let ensembles = resolve_no_shuffle_ensembles(
788 &info.stream_job_fragments,
789 &info.upstream_fragment_downstreams,
790 )?;
791 let actors = render_actors(
792 &info.stream_job_fragments,
793 &self.database_info,
794 &info.definition,
795 &info.stream_job_fragments.inner.ctx,
796 &info.streaming_job_model,
797 partial_graph_manager
798 .control_stream_manager()
799 .env
800 .actor_id_generator(),
801 worker_nodes,
802 &ensembles,
803 &info.database_resource_group,
804 )?;
805 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
806 fill_snapshot_backfill_epoch(
807 &mut fragment.nodes,
808 None,
809 &cross_db_snapshot_backfill_info,
810 )?;
811 }
812
813 let new_upstream_sink =
815 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
816 Some(ctx)
817 } else {
818 None
819 };
820
821 let mut edges = self.database_info.build_edge(
822 Some((&info, false)),
823 None,
824 new_upstream_sink,
825 partial_graph_manager.control_stream_manager(),
826 &actors.stream_actors,
827 &actors.actor_location,
828 );
829 let resolved_split_assignment = resolve_source_splits(
831 &info,
832 &actors,
833 edges.actor_new_no_shuffle(),
834 &self.database_info,
835 )?;
836
837 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
839 let (fragment, _) =
840 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
841 .expect("should have parallel cdc fragment");
842 Some(CdcTableBackfillTracker::new(
843 fragment.fragment_id,
844 splits.clone(),
845 ))
846 } else {
847 None
848 };
849 self.database_info
850 .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
851 self.database_info.pre_apply_new_fragments(
852 info.stream_job_fragments
853 .new_fragment_info(
854 &actors.stream_actors,
855 &actors.actor_location,
856 &resolved_split_assignment,
857 )
858 .map(|(fragment_id, fragment_infos)| {
859 (fragment_id, info.streaming_job.id(), fragment_infos)
860 }),
861 );
862 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
863 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
864 self.database_info.pre_apply_add_node_upstream(
865 downstream_fragment_id,
866 &PbUpstreamSinkInfo {
867 upstream_fragment_id: ctx.sink_fragment_id,
868 sink_output_schema: ctx.sink_output_fields.clone(),
869 project_exprs: ctx.project_exprs.clone(),
870 },
871 );
872 }
873
874 let (table_ids, node_actors) = self.collect_base_info();
875
876 let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
878 &info,
879 &mut edges,
880 &actors.stream_actors,
881 &actors.actor_location,
882 ));
883
884 let actor_cdc_table_snapshot_splits = self
886 .database_info
887 .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
888
889 let is_currently_paused = self.state.is_paused();
891 let mutation = Command::create_streaming_job_to_mutation(
892 &info,
893 &job_type,
894 is_currently_paused,
895 &mut edges,
896 partial_graph_manager.control_stream_manager(),
897 actor_cdc_table_snapshot_splits,
898 &resolved_split_assignment,
899 &actors.stream_actors,
900 &actors.actor_location,
901 )?;
902
903 (
904 Some(mutation),
905 table_ids,
906 actors_to_create,
907 node_actors,
908 PostCollectCommand::CreateStreamingJob {
909 info,
910 job_type,
911 cross_db_snapshot_backfill_info,
912 resolved_split_assignment,
913 },
914 )
915 }
916
917 Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
918
919 Some(Command::Pause) => {
920 let prev_is_paused = self.state.is_paused();
921 self.state.set_is_paused(true);
922 let mutation = Command::pause_to_mutation(prev_is_paused);
923 let (table_ids, node_actors) = self.collect_base_info();
924 (
925 mutation,
926 table_ids,
927 None,
928 node_actors,
929 PostCollectCommand::Command("Pause".to_owned()),
930 )
931 }
932
933 Some(Command::Resume) => {
934 let prev_is_paused = self.state.is_paused();
935 self.state.set_is_paused(false);
936 let mutation = Command::resume_to_mutation(prev_is_paused);
937 let (table_ids, node_actors) = self.collect_base_info();
938 (
939 mutation,
940 table_ids,
941 None,
942 node_actors,
943 PostCollectCommand::Command("Resume".to_owned()),
944 )
945 }
946
947 Some(Command::Throttle { jobs, config }) => {
948 let mutation = Some(Command::throttle_to_mutation(&config));
949 for (fragment_id, throttle_config) in &config {
950 self.database_info
951 .pre_apply_throttle(*fragment_id, throttle_config);
952 }
953 throttle_for_creating_jobs = Some((jobs, config));
954 self.apply_simple_command(mutation, "Throttle")
955 }
956
957 Some(Command::DropStreamingJobs {
958 streaming_job_ids,
959 unregistered_state_table_ids: _,
960 dropped_sink_fragment_by_targets,
961 }) => {
962 for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
964 self.database_info
965 .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
966 }
967
968 let (table_ids, node_actors) = self.collect_base_info();
969
970 let mut actors = Vec::new();
971 for job_id in streaming_job_ids {
972 let Some(job) = self.database_info.post_apply_remove_job(job_id) else {
973 warn!(
974 %job_id,
975 "skip drop payload for streaming job that has already been removed from barrier worker"
976 );
977 continue;
978 };
979
980 for fragment in job.fragment_infos.values() {
981 actors.extend(fragment.actors.keys().copied());
982 }
983 }
984
985 let mutation = Some(Command::drop_streaming_jobs_to_mutation(
986 &actors,
987 &dropped_sink_fragment_by_targets,
988 ));
989 (
990 mutation,
991 table_ids,
992 None,
993 node_actors,
994 PostCollectCommand::DropStreamingJobs,
995 )
996 }
997
998 Some(Command::RescheduleIntent {
999 reschedule_plan, ..
1000 }) => {
1001 let ReschedulePlan {
1002 reschedules,
1003 fragment_actors,
1004 } = reschedule_plan
1005 .as_ref()
1006 .expect("reschedule intent should be resolved in global barrier worker");
1007
1008 for (fragment_id, reschedule) in reschedules {
1010 self.database_info.pre_apply_reschedule(
1011 *fragment_id,
1012 reschedule
1013 .added_actors
1014 .iter()
1015 .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
1016 actors.iter().map(|actor_id| {
1017 (
1018 *actor_id,
1019 InflightActorInfo {
1020 worker_id: *node_id,
1021 vnode_bitmap: reschedule
1022 .newly_created_actors
1023 .get(actor_id)
1024 .expect("should exist")
1025 .0
1026 .0
1027 .vnode_bitmap
1028 .clone(),
1029 splits: reschedule
1030 .actor_splits
1031 .get(actor_id)
1032 .cloned()
1033 .unwrap_or_default(),
1034 },
1035 )
1036 })
1037 })
1038 .collect(),
1039 reschedule
1040 .vnode_bitmap_updates
1041 .iter()
1042 .filter(|(actor_id, _)| {
1043 !reschedule.newly_created_actors.contains_key(*actor_id)
1044 })
1045 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
1046 .collect(),
1047 reschedule.actor_splits.clone(),
1048 );
1049 }
1050
1051 let (table_ids, node_actors) = self.collect_base_info();
1052
1053 let actors_to_create = Some(Command::reschedule_actors_to_create(
1055 reschedules,
1056 fragment_actors,
1057 &self.database_info,
1058 partial_graph_manager.control_stream_manager(),
1059 ));
1060
1061 self.database_info
1063 .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
1064 (
1065 *fragment_id,
1066 reschedule.removed_actors.iter().cloned().collect(),
1067 )
1068 }));
1069
1070 let mutation = Command::reschedule_to_mutation(
1072 reschedules,
1073 fragment_actors,
1074 partial_graph_manager.control_stream_manager(),
1075 &mut self.database_info,
1076 )?;
1077
1078 let reschedules = reschedule_plan
1079 .expect("reschedule intent should be resolved in global barrier worker")
1080 .reschedules;
1081 (
1082 mutation,
1083 table_ids,
1084 actors_to_create,
1085 node_actors,
1086 PostCollectCommand::Reschedule { reschedules },
1087 )
1088 }
1089
1090 Some(Command::ReplaceStreamJob(plan)) => {
1091 let ensembles = resolve_no_shuffle_ensembles(
1092 &plan.new_fragments,
1093 &plan.upstream_fragment_downstreams,
1094 )?;
1095 let mut render_result = render_actors(
1096 &plan.new_fragments,
1097 &self.database_info,
1098 "", &plan.new_fragments.inner.ctx,
1100 &plan.streaming_job_model,
1101 partial_graph_manager
1102 .control_stream_manager()
1103 .env
1104 .actor_id_generator(),
1105 worker_nodes,
1106 &ensembles,
1107 &plan.database_resource_group,
1108 )?;
1109
1110 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1113 let actor_id_counter = partial_graph_manager
1114 .control_stream_manager()
1115 .env
1116 .actor_id_generator();
1117 for sink_ctx in sinks {
1118 let original_fragment_id = sink_ctx.original_fragment.fragment_id;
1119 let original_frag_info = self.database_info.fragment(original_fragment_id);
1120 let actor_template = EnsembleActorTemplate::from_existing_inflight_fragment(
1121 original_frag_info,
1122 );
1123 let new_aligner = ComponentFragmentAligner::new_persistent(
1124 &actor_template,
1125 actor_id_counter,
1126 );
1127 let distribution_type: DistributionType =
1128 sink_ctx.new_fragment.distribution_type.into();
1129 let actor_assignments =
1130 new_aligner.align_component_actor(distribution_type);
1131 let new_fragment_id = sink_ctx.new_fragment.fragment_id;
1132 let mut actors = Vec::with_capacity(actor_assignments.len());
1133 for (&actor_id, (worker_id, vnode_bitmap)) in &actor_assignments {
1134 render_result.actor_location.insert(actor_id, *worker_id);
1135 actors.push(StreamActor {
1136 actor_id,
1137 fragment_id: new_fragment_id,
1138 vnode_bitmap: vnode_bitmap.clone(),
1139 mview_definition: String::new(),
1140 expr_context: Some(sink_ctx.ctx.to_expr_context()),
1141 config_override: sink_ctx.ctx.config_override.clone(),
1142 });
1143 }
1144 render_result.stream_actors.insert(new_fragment_id, actors);
1145 }
1146 }
1147
1148 let mut edges = self.database_info.build_edge(
1150 None,
1151 Some(&plan),
1152 None,
1153 partial_graph_manager.control_stream_manager(),
1154 &render_result.stream_actors,
1155 &render_result.actor_location,
1156 );
1157
1158 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
1160 .stream_actors
1161 .iter()
1162 .map(|(fragment_id, actors)| {
1163 (
1164 *fragment_id,
1165 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
1166 )
1167 })
1168 .collect();
1169 let resolved_split_assignment = match &plan.split_plan {
1170 ReplaceJobSplitPlan::Discovered(discovered) => {
1171 SourceManager::resolve_fragment_to_actor_splits(
1172 &plan.new_fragments,
1173 discovered,
1174 &fragment_actor_ids,
1175 )?
1176 }
1177 ReplaceJobSplitPlan::AlignFromPrevious => {
1178 SourceManager::resolve_replace_source_splits(
1179 &plan.new_fragments,
1180 &plan.replace_upstream,
1181 edges.actor_new_no_shuffle(),
1182 |_fragment_id, actor_id| {
1183 self.database_info.fragment_infos().find_map(|fragment| {
1184 fragment
1185 .actors
1186 .get(&actor_id)
1187 .map(|info| info.splits.clone())
1188 })
1189 },
1190 )?
1191 }
1192 };
1193
1194 self.database_info.pre_apply_new_fragments(
1196 plan.new_fragments
1197 .new_fragment_info(
1198 &render_result.stream_actors,
1199 &render_result.actor_location,
1200 &resolved_split_assignment,
1201 )
1202 .map(|(fragment_id, new_fragment)| {
1203 (fragment_id, plan.streaming_job.id(), new_fragment)
1204 }),
1205 );
1206 for (fragment_id, replace_map) in &plan.replace_upstream {
1207 self.database_info
1208 .pre_apply_replace_node_upstream(*fragment_id, replace_map);
1209 }
1210 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1211 self.database_info
1212 .pre_apply_new_fragments(sinks.iter().map(|sink| {
1213 (
1214 sink.new_fragment.fragment_id,
1215 sink.original_sink.id.as_job_id(),
1216 sink.new_fragment_info(
1217 &render_result.stream_actors,
1218 &render_result.actor_location,
1219 ),
1220 )
1221 }));
1222 }
1223
1224 let (table_ids, node_actors) = self.collect_base_info();
1225
1226 let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
1228 &plan,
1229 &mut edges,
1230 &self.database_info,
1231 &render_result.stream_actors,
1232 &render_result.actor_location,
1233 ));
1234
1235 let mutation = Command::replace_stream_job_to_mutation(
1238 &plan,
1239 &mut edges,
1240 &mut self.database_info,
1241 &resolved_split_assignment,
1242 )?;
1243
1244 {
1246 let mut fragment_ids_to_remove: Vec<_> = plan
1247 .old_fragments
1248 .fragments
1249 .values()
1250 .map(|f| f.fragment_id)
1251 .collect();
1252 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1253 fragment_ids_to_remove
1254 .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
1255 }
1256 self.database_info
1257 .post_apply_remove_fragments(fragment_ids_to_remove);
1258 }
1259
1260 (
1261 mutation,
1262 table_ids,
1263 actors_to_create,
1264 node_actors,
1265 PostCollectCommand::ReplaceStreamJob {
1266 plan,
1267 resolved_split_assignment,
1268 },
1269 )
1270 }
1271
1272 Some(Command::SourceChangeSplit(split_state)) => {
1273 self.database_info.pre_apply_split_assignments(
1275 split_state
1276 .split_assignment
1277 .iter()
1278 .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
1279 );
1280
1281 let mutation = Some(Command::source_change_split_to_mutation(
1282 &split_state.split_assignment,
1283 ));
1284 let (table_ids, node_actors) = self.collect_base_info();
1285 (
1286 mutation,
1287 table_ids,
1288 None,
1289 node_actors,
1290 PostCollectCommand::SourceChangeSplit {
1291 split_assignment: split_state.split_assignment,
1292 },
1293 )
1294 }
1295
1296 Some(Command::CreateSubscription {
1297 subscription_id,
1298 upstream_mv_table_id,
1299 retention_second,
1300 }) => {
1301 self.database_info.register_subscriber(
1302 upstream_mv_table_id.as_job_id(),
1303 subscription_id.as_subscriber_id(),
1304 SubscriberType::Subscription(retention_second),
1305 );
1306 let mutation = Some(Command::create_subscription_to_mutation(
1307 upstream_mv_table_id,
1308 subscription_id,
1309 ));
1310 let (table_ids, node_actors) = self.collect_base_info();
1311 (
1312 mutation,
1313 table_ids,
1314 None,
1315 node_actors,
1316 PostCollectCommand::CreateSubscription { subscription_id },
1317 )
1318 }
1319
1320 Some(Command::DropSubscription {
1321 subscription_id,
1322 upstream_mv_table_id,
1323 }) => {
1324 if self
1325 .database_info
1326 .unregister_subscriber(
1327 upstream_mv_table_id.as_job_id(),
1328 subscription_id.as_subscriber_id(),
1329 )
1330 .is_none()
1331 {
1332 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
1333 }
1334 let mutation = Some(Command::drop_subscription_to_mutation(
1335 upstream_mv_table_id,
1336 subscription_id,
1337 ));
1338 let (table_ids, node_actors) = self.collect_base_info();
1339 (
1340 mutation,
1341 table_ids,
1342 None,
1343 node_actors,
1344 PostCollectCommand::Command("DropSubscription".to_owned()),
1345 )
1346 }
1347
1348 Some(Command::AlterSubscriptionRetention {
1349 subscription_id,
1350 upstream_mv_table_id,
1351 retention_second,
1352 }) => {
1353 self.database_info.update_subscription_retention(
1354 upstream_mv_table_id.as_job_id(),
1355 subscription_id.as_subscriber_id(),
1356 retention_second,
1357 );
1358 self.apply_simple_command(None, "AlterSubscriptionRetention")
1359 }
1360
1361 Some(Command::ConnectorPropsChange(config)) => {
1362 let mutation = Some(Command::connector_props_change_to_mutation(&config));
1363 let (table_ids, node_actors) = self.collect_base_info();
1364 (
1365 mutation,
1366 table_ids,
1367 None,
1368 node_actors,
1369 PostCollectCommand::ConnectorPropsChange(config),
1370 )
1371 }
1372
1373 Some(Command::Refresh {
1374 table_id,
1375 associated_source_id,
1376 }) => {
1377 let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
1378 self.apply_simple_command(mutation, "Refresh")
1379 }
1380
1381 Some(Command::ListFinish {
1382 table_id: _,
1383 associated_source_id,
1384 }) => {
1385 let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
1386 self.apply_simple_command(mutation, "ListFinish")
1387 }
1388
1389 Some(Command::LoadFinish {
1390 table_id: _,
1391 associated_source_id,
1392 }) => {
1393 let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
1394 self.apply_simple_command(mutation, "LoadFinish")
1395 }
1396
1397 Some(Command::ResetSource { source_id }) => {
1398 let mutation = Some(Command::reset_source_to_mutation(source_id));
1399 self.apply_simple_command(mutation, "ResetSource")
1400 }
1401
1402 Some(Command::ResumeBackfill { target }) => {
1403 let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
1404 let (table_ids, node_actors) = self.collect_base_info();
1405 (
1406 mutation,
1407 table_ids,
1408 None,
1409 node_actors,
1410 PostCollectCommand::ResumeBackfill { target },
1411 )
1412 }
1413
1414 Some(Command::InjectSourceOffsets {
1415 source_id,
1416 split_offsets,
1417 }) => {
1418 let mutation = Some(Command::inject_source_offsets_to_mutation(
1419 source_id,
1420 &split_offsets,
1421 ));
1422 self.apply_simple_command(mutation, "InjectSourceOffsets")
1423 }
1424 };
1425
1426 let mut finished_snapshot_backfill_jobs = HashSet::new();
1427 let mutation = match mutation {
1428 Some(mutation) => Some(mutation),
1429 None => {
1430 let mut finished_snapshot_backfill_job_info = HashMap::new();
1431 if barrier_info.kind.is_checkpoint() {
1432 for (&job_id, job) in &mut self.independent_checkpoint_job_controls {
1433 if let IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) =
1434 job
1435 && creating_job.should_merge_to_upstream()
1436 {
1437 let info = creating_job
1438 .start_consume_upstream(partial_graph_manager, &barrier_info)?;
1439 finished_snapshot_backfill_job_info
1440 .try_insert(job_id, info)
1441 .expect("non-duplicated");
1442 }
1443 }
1444 }
1445
1446 if !finished_snapshot_backfill_job_info.is_empty() {
1447 let actors_to_create = actors_to_create.get_or_insert_default();
1448 let mut subscriptions_to_drop = vec![];
1449 let mut dispatcher_update = vec![];
1450 let mut actor_splits = HashMap::new();
1451 for (job_id, info) in finished_snapshot_backfill_job_info {
1452 finished_snapshot_backfill_jobs.insert(job_id);
1453 subscriptions_to_drop.extend(
1454 info.snapshot_backfill_upstream_tables.iter().map(
1455 |upstream_table_id| PbSubscriptionUpstreamInfo {
1456 subscriber_id: job_id.as_subscriber_id(),
1457 upstream_mv_table_id: *upstream_table_id,
1458 },
1459 ),
1460 );
1461 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
1462 assert_matches!(
1463 self.database_info.unregister_subscriber(
1464 upstream_mv_table_id.as_job_id(),
1465 job_id.as_subscriber_id()
1466 ),
1467 Some(SubscriberType::SnapshotBackfill)
1468 );
1469 }
1470
1471 table_ids_to_commit.extend(
1472 info.fragment_infos
1473 .values()
1474 .flat_map(|fragment| fragment.state_table_ids.iter())
1475 .copied(),
1476 );
1477
1478 let actor_len = info
1479 .fragment_infos
1480 .values()
1481 .map(|fragment| fragment.actors.len() as u64)
1482 .sum();
1483 let id_gen = GlobalActorIdGen::new(
1484 partial_graph_manager
1485 .control_stream_manager()
1486 .env
1487 .actor_id_generator(),
1488 actor_len,
1489 );
1490 let mut next_local_actor_id = 0;
1491 let actor_mapping: HashMap<_, _> = info
1493 .fragment_infos
1494 .values()
1495 .flat_map(|fragment| fragment.actors.keys())
1496 .map(|old_actor_id| {
1497 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
1498 next_local_actor_id += 1;
1499 (*old_actor_id, new_actor_id.as_global_id())
1500 })
1501 .collect();
1502 let actor_mapping = &actor_mapping;
1503 let new_stream_actors: HashMap<_, _> = info
1504 .stream_actors
1505 .into_iter()
1506 .map(|(old_actor_id, mut actor)| {
1507 let new_actor_id = actor_mapping[&old_actor_id];
1508 actor.actor_id = new_actor_id;
1509 (new_actor_id, actor)
1510 })
1511 .collect();
1512 let new_fragment_info: HashMap<_, _> = info
1513 .fragment_infos
1514 .into_iter()
1515 .map(|(fragment_id, mut fragment)| {
1516 let actors = take(&mut fragment.actors);
1517 fragment.actors = actors
1518 .into_iter()
1519 .map(|(old_actor_id, actor)| {
1520 let new_actor_id = actor_mapping[&old_actor_id];
1521 (new_actor_id, actor)
1522 })
1523 .collect();
1524 (fragment_id, fragment)
1525 })
1526 .collect();
1527 actor_splits.extend(
1528 new_fragment_info
1529 .values()
1530 .flat_map(|fragment| &fragment.actors)
1531 .map(|(actor_id, actor)| {
1532 (
1533 *actor_id,
1534 ConnectorSplits {
1535 splits: actor
1536 .splits
1537 .iter()
1538 .map(ConnectorSplit::from)
1539 .collect(),
1540 },
1541 )
1542 }),
1543 );
1544 let partial_graph_id = to_partial_graph_id(self.database_id, None);
1546 let mut edge_builder = FragmentEdgeBuilder::new(
1547 info.upstream_fragment_downstreams
1548 .keys()
1549 .map(|upstream_fragment_id| {
1550 self.database_info.fragment(*upstream_fragment_id)
1551 })
1552 .chain(new_fragment_info.values())
1553 .map(|fragment| {
1554 (
1555 fragment.fragment_id,
1556 EdgeBuilderFragmentInfo::from_inflight(
1557 fragment,
1558 partial_graph_id,
1559 partial_graph_manager.control_stream_manager(),
1560 ),
1561 )
1562 }),
1563 );
1564 edge_builder.add_relations(&info.upstream_fragment_downstreams);
1565 edge_builder.add_relations(&info.downstreams);
1566 let mut edges = edge_builder.build();
1567 let new_actors_to_create = edges.collect_actors_to_create(
1568 new_fragment_info.values().map(|fragment| {
1569 (
1570 fragment.fragment_id,
1571 &fragment.nodes,
1572 fragment.actors.iter().map(|(actor_id, actor)| {
1573 (&new_stream_actors[actor_id], actor.worker_id)
1574 }),
1575 [], )
1577 }),
1578 );
1579 dispatcher_update.extend(
1580 info.upstream_fragment_downstreams.keys().flat_map(
1581 |upstream_fragment_id| {
1582 let new_actor_dispatchers = edges
1583 .dispatchers
1584 .remove(upstream_fragment_id)
1585 .expect("should exist");
1586 new_actor_dispatchers.into_iter().flat_map(
1587 |(upstream_actor_id, dispatchers)| {
1588 dispatchers.into_iter().map(move |dispatcher| {
1589 PbDispatcherUpdate {
1590 actor_id: upstream_actor_id,
1591 dispatcher_id: dispatcher.dispatcher_id,
1592 hash_mapping: dispatcher.hash_mapping,
1593 removed_downstream_actor_id: dispatcher
1594 .downstream_actor_id
1595 .iter()
1596 .map(|new_downstream_actor_id| {
1597 actor_mapping
1598 .iter()
1599 .find_map(
1600 |(old_actor_id, new_actor_id)| {
1601 (new_downstream_actor_id
1602 == new_actor_id)
1603 .then_some(*old_actor_id)
1604 },
1605 )
1606 .expect("should exist")
1607 })
1608 .collect(),
1609 added_downstream_actor_id: dispatcher
1610 .downstream_actor_id,
1611 }
1612 })
1613 },
1614 )
1615 },
1616 ),
1617 );
1618 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1619 for (worker_id, worker_actors) in new_actors_to_create {
1620 node_actors.entry(worker_id).or_default().extend(
1621 worker_actors.values().flat_map(|(_, actors, _)| {
1622 actors.iter().map(|(actor, _, _)| actor.actor_id)
1623 }),
1624 );
1625 actors_to_create
1626 .entry(worker_id)
1627 .or_default()
1628 .extend(worker_actors);
1629 }
1630 self.database_info.add_existing(InflightStreamingJobInfo {
1631 job_id,
1632 fragment_infos: new_fragment_info,
1633 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
1635 cdc_table_backfill_tracker: None, });
1637 }
1638
1639 Some(PbMutation::Update(PbUpdateMutation {
1640 dispatcher_update,
1641 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
1645 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
1649 }))
1650 } else {
1651 let fragment_ids = self.database_info.take_pending_backfill_nodes();
1652 if fragment_ids.is_empty() {
1653 None
1654 } else {
1655 Some(PbMutation::StartFragmentBackfill(
1656 PbStartFragmentBackfillMutation { fragment_ids },
1657 ))
1658 }
1659 }
1660 }
1661 };
1662
1663 for (job_id, job) in &mut self.independent_checkpoint_job_controls {
1665 match job {
1666 IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) => {
1667 if !finished_snapshot_backfill_jobs.contains(job_id) {
1668 let throttle_mutation = if let Some((ref jobs, ref config)) =
1669 throttle_for_creating_jobs
1670 && jobs.contains(job_id)
1671 {
1672 assert_eq!(
1673 jobs.len(),
1674 1,
1675 "should not alter rate limit of snapshot backfill job with other jobs"
1676 );
1677 Some((
1678 Mutation::Throttle(ThrottleMutation {
1679 fragment_throttle: config
1680 .iter()
1681 .map(|(fragment_id, config)| (*fragment_id, *config))
1682 .collect(),
1683 }),
1684 take(notifiers),
1685 ))
1686 } else {
1687 None
1688 };
1689 creating_job.on_new_upstream_barrier(
1690 partial_graph_manager,
1691 &barrier_info,
1692 throttle_mutation,
1693 )?;
1694 }
1695 }
1696 IndependentCheckpointJobControl::BatchRefresh(batch_refresh_job) => {
1697 batch_refresh_job.on_new_upstream_barrier(
1698 partial_graph_manager,
1699 &barrier_info,
1700 None, )?;
1702 }
1703 }
1704 }
1705
1706 partial_graph_manager.inject_barrier(
1707 to_partial_graph_id(self.database_id, None),
1708 mutation,
1709 &node_actors,
1710 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1711 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1712 actors_to_create,
1713 PartialGraphBarrierInfo::new(
1714 post_collect_command,
1715 barrier_info,
1716 take(notifiers),
1717 table_ids_to_commit,
1718 ),
1719 )?;
1720
1721 Ok(ApplyCommandInfo {
1722 jobs_to_wait: finished_snapshot_backfill_jobs,
1723 })
1724 }
1725}