1use std::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::sync::atomic::AtomicU32;
20
21use risingwave_common::bail;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::TableId;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
29use risingwave_pb::common::WorkerNode;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
32use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
33use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
34use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
35use risingwave_pb::stream_plan::{
36 AddMutation, PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
37 PbUpstreamSinkInfo, ThrottleMutation,
38};
39use tracing::warn;
40
41use crate::MetaResult;
42use crate::barrier::cdc_progress::CdcTableBackfillTracker;
43use crate::barrier::checkpoint::{
44 BatchRefreshJobCheckpointControl, BatchRefreshLogicalFragments, CreatingStreamingJobControl,
45 DatabaseCheckpointControl, IndependentCheckpointJobControl,
46};
47use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
48use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
49use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
50use crate::barrier::info::{
51 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
52 SubscriberType,
53};
54use crate::barrier::notifier::Notifier;
55use crate::barrier::partial_graph::{PartialGraphBarrierInfo, PartialGraphManager};
56use crate::barrier::rpc::to_partial_graph_id;
57use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
58use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
59use crate::controller::scale::{
60 ComponentFragmentAligner, EnsembleActorTemplate, LoadedFragment, NoShuffleEnsemble,
61 build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
62};
63use crate::model::{
64 ActorId, ActorNewNoShuffle, FragmentDownstreamRelation, FragmentId, StreamActor, StreamContext,
65 StreamJobActorsToCreate, StreamJobFragmentsToCreate,
66};
67use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
68use crate::stream::{
69 GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
70 fill_snapshot_backfill_epoch,
71};
72
73pub(in crate::barrier) struct BarrierWorkerState {
75 in_flight_prev_epoch: TracedEpoch,
80
81 pending_non_checkpoint_barriers: Vec<u64>,
83
84 is_paused: bool,
86}
87
88impl BarrierWorkerState {
89 pub(super) fn new() -> Self {
90 Self {
91 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
92 pending_non_checkpoint_barriers: vec![],
93 is_paused: false,
94 }
95 }
96
97 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
98 Self {
99 in_flight_prev_epoch,
100 pending_non_checkpoint_barriers: vec![],
101 is_paused,
102 }
103 }
104
105 pub fn is_paused(&self) -> bool {
106 self.is_paused
107 }
108
109 fn set_is_paused(&mut self, is_paused: bool) {
110 if self.is_paused != is_paused {
111 tracing::info!(
112 currently_paused = self.is_paused,
113 newly_paused = is_paused,
114 "update paused state"
115 );
116 self.is_paused = is_paused;
117 }
118 }
119
120 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
121 &self.in_flight_prev_epoch
122 }
123
124 pub fn next_barrier_info(
126 &mut self,
127 is_checkpoint: bool,
128 curr_epoch: TracedEpoch,
129 ) -> BarrierInfo {
130 assert!(
131 self.in_flight_prev_epoch.value() < curr_epoch.value(),
132 "curr epoch regress. {} > {}",
133 self.in_flight_prev_epoch.value(),
134 curr_epoch.value()
135 );
136 let prev_epoch = self.in_flight_prev_epoch.clone();
137 self.in_flight_prev_epoch = curr_epoch.clone();
138 self.pending_non_checkpoint_barriers
139 .push(prev_epoch.value().0);
140 let kind = if is_checkpoint {
141 let epochs = take(&mut self.pending_non_checkpoint_barriers);
142 BarrierKind::Checkpoint(epochs)
143 } else {
144 BarrierKind::Barrier
145 };
146 BarrierInfo {
147 prev_epoch,
148 curr_epoch,
149 kind,
150 }
151 }
152}
153
154pub(super) struct ApplyCommandInfo {
155 pub jobs_to_wait: HashSet<JobId>,
156}
157
158type ApplyCommandResult = (
161 Option<Mutation>,
162 HashSet<TableId>,
163 Option<StreamJobActorsToCreate>,
164 HashMap<WorkerId, HashSet<ActorId>>,
165 PostCollectCommand,
166);
167
168pub(crate) struct RenderResult {
170 pub stream_actors: HashMap<FragmentId, Vec<StreamActor>>,
172 pub actor_location: HashMap<ActorId, WorkerId>,
174}
175
176pub(crate) fn resolve_no_shuffle_ensembles(
185 fragments: &StreamJobFragmentsToCreate,
186 upstream_fragment_downstreams: &FragmentDownstreamRelation,
187) -> MetaResult<Vec<NoShuffleEnsemble>> {
188 let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
190
191 for (upstream_fid, relations) in &fragments.downstreams {
193 for rel in relations {
194 if rel.dispatcher_type == DispatcherType::NoShuffle {
195 new_no_shuffle
196 .entry(*upstream_fid)
197 .or_default()
198 .insert(rel.downstream_fragment_id);
199 }
200 }
201 }
202
203 for (upstream_fid, relations) in upstream_fragment_downstreams {
205 for rel in relations {
206 if rel.dispatcher_type == DispatcherType::NoShuffle {
207 new_no_shuffle
208 .entry(*upstream_fid)
209 .or_default()
210 .insert(rel.downstream_fragment_id);
211 }
212 }
213 }
214
215 let mut ensembles = if new_no_shuffle.is_empty() {
216 Vec::new()
217 } else {
218 let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
220 .iter()
221 .flat_map(|(upstream_fid, downstream_fids)| {
222 downstream_fids
223 .iter()
224 .map(move |downstream_fid| (*upstream_fid, *downstream_fid))
225 })
226 .collect();
227
228 let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
229 .iter()
230 .flat_map(|(u, d)| [*u, *d])
231 .collect::<HashSet<_>>()
232 .into_iter()
233 .collect();
234
235 let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
236 find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
237 };
238
239 let covered: HashSet<FragmentId> = ensembles
241 .iter()
242 .flat_map(|e| e.component_fragments())
243 .collect();
244 for fragment_id in fragments.inner.fragments.keys() {
245 if !covered.contains(fragment_id) {
246 ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
247 }
248 }
249
250 Ok(ensembles)
251}
252
253pub(super) fn render_actors(
264 fragments: &StreamJobFragmentsToCreate,
265 database_info: &InflightDatabaseInfo,
266 definition: &str,
267 ctx: &StreamContext,
268 streaming_job_model: &streaming_job::Model,
269 actor_id_counter: &AtomicU32,
270 worker_map: &HashMap<WorkerId, WorkerNode>,
271 ensembles: &[NoShuffleEnsemble],
272 database_resource_group: &str,
273) -> MetaResult<RenderResult> {
274 let mut actor_assignments: HashMap<FragmentId, HashMap<ActorId, (WorkerId, Option<Bitmap>)>> =
277 HashMap::new();
278
279 for ensemble in ensembles {
280 let existing_fragment_ids: Vec<FragmentId> = ensemble
285 .component_fragments()
286 .filter(|fragment_id| !fragments.inner.fragments.contains_key(fragment_id))
287 .collect();
288
289 let actor_template = if let Some(&first_existing) = existing_fragment_ids.first() {
290 let template = EnsembleActorTemplate::from_existing_inflight_fragment(
291 database_info.fragment(first_existing),
292 );
293
294 for &other_fragment_id in &existing_fragment_ids[1..] {
297 let other = EnsembleActorTemplate::from_existing_inflight_fragment(
298 database_info.fragment(other_fragment_id),
299 );
300 template.assert_aligned_with(&other, first_existing, other_fragment_id);
301 }
302
303 template
304 } else {
305 let first_component = ensemble
307 .component_fragments()
308 .next()
309 .expect("ensemble must have at least one component");
310 let fragment = &fragments.inner.fragments[&first_component];
311 let distribution_type: DistributionType = fragment.distribution_type.into();
312 let vnode_count = fragment.vnode_count();
313
314 for fragment_id in ensemble.component_fragments() {
316 let f = &fragments.inner.fragments[&fragment_id];
317 assert_eq!(
318 vnode_count,
319 f.vnode_count(),
320 "component fragments {} and {} in the same no-shuffle ensemble have \
321 different vnode counts: {} vs {}",
322 first_component,
323 fragment_id,
324 vnode_count,
325 f.vnode_count(),
326 );
327 }
328
329 EnsembleActorTemplate::render_new(
330 streaming_job_model,
331 worker_map,
332 None,
333 database_resource_group.to_owned(),
334 distribution_type,
335 vnode_count,
336 )?
337 };
338
339 for fragment_id in ensemble.component_fragments() {
341 if !fragments.inner.fragments.contains_key(&fragment_id) {
342 continue; }
344 let fragment = &fragments.inner.fragments[&fragment_id];
345 let distribution_type: DistributionType = fragment.distribution_type.into();
346 let aligner =
347 ComponentFragmentAligner::new_persistent(&actor_template, actor_id_counter);
348 let assignments = aligner.align_component_actor(distribution_type);
349 actor_assignments.insert(fragment_id, assignments);
350 }
351 }
352
353 let mut result_stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
355 let mut result_actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
356
357 for (fragment_id, assignments) in &actor_assignments {
358 let mut actors = Vec::with_capacity(assignments.len());
359 for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
360 result_actor_location.insert(actor_id, *worker_id);
361 actors.push(StreamActor {
362 actor_id,
363 fragment_id: *fragment_id,
364 vnode_bitmap: vnode_bitmap.clone(),
365 mview_definition: definition.to_owned(),
366 expr_context: Some(ctx.to_expr_context()),
367 config_override: ctx.config_override.clone(),
368 });
369 }
370 result_stream_actors.insert(*fragment_id, actors);
371 }
372
373 Ok(RenderResult {
374 stream_actors: result_stream_actors,
375 actor_location: result_actor_location,
376 })
377}
378impl DatabaseCheckpointControl {
379 fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
381 let table_ids_to_commit = self.database_info.existing_table_ids().collect();
382 let node_actors =
383 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
384 (table_ids_to_commit, node_actors)
385 }
386
387 fn apply_simple_command(
391 &self,
392 mutation: Option<Mutation>,
393 command_name: &'static str,
394 ) -> ApplyCommandResult {
395 let (table_ids, node_actors) = self.collect_base_info();
396 (
397 mutation,
398 table_ids,
399 None,
400 node_actors,
401 PostCollectCommand::Command(command_name.to_owned()),
402 )
403 }
404
405 pub(super) fn apply_command(
408 &mut self,
409 command: Option<Command>,
410 notifiers: &mut Vec<Notifier>,
411 barrier_info: BarrierInfo,
412 partial_graph_manager: &mut PartialGraphManager,
413 hummock_version_stats: &HummockVersionStats,
414 worker_nodes: &HashMap<WorkerId, WorkerNode>,
415 ) -> MetaResult<ApplyCommandInfo> {
416 debug_assert!(
417 !matches!(
418 command,
419 Some(Command::RescheduleIntent {
420 reschedule_plan: None,
421 ..
422 })
423 ),
424 "reschedule intent must be resolved before apply"
425 );
426 if matches!(
427 command,
428 Some(Command::RescheduleIntent {
429 reschedule_plan: None,
430 ..
431 })
432 ) {
433 bail!("reschedule intent must be resolved before apply");
434 }
435
436 fn resolve_source_splits(
441 info: &CreateStreamingJobCommandInfo,
442 render_result: &RenderResult,
443 actor_no_shuffle: &ActorNewNoShuffle,
444 database_info: &InflightDatabaseInfo,
445 ) -> MetaResult<SplitAssignment> {
446 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
447 .stream_actors
448 .iter()
449 .map(|(fragment_id, actors)| {
450 (
451 *fragment_id,
452 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
453 )
454 })
455 .collect();
456 let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
457 &info.stream_job_fragments,
458 &info.init_split_assignment,
459 &fragment_actor_ids,
460 )?;
461 resolved.extend(SourceManager::resolve_backfill_splits(
462 &info.stream_job_fragments,
463 actor_no_shuffle,
464 |fragment_id, actor_id| {
465 database_info
466 .fragment(fragment_id)
467 .actors
468 .get(&actor_id)
469 .map(|info| info.splits.clone())
470 },
471 )?);
472 Ok(resolved)
473 }
474
475 let mut throttle_for_creating_jobs: Option<(
477 HashSet<JobId>,
478 HashMap<FragmentId, ThrottleConfig>,
479 )> = None;
480
481 let (
485 mutation,
486 mut table_ids_to_commit,
487 mut actors_to_create,
488 mut node_actors,
489 post_collect_command,
490 ) = match command {
491 None => self.apply_simple_command(None, "barrier"),
492 Some(Command::CreateStreamingJob {
493 mut info,
494 job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
495 cross_db_snapshot_backfill_info,
496 }) => {
497 let ensembles = resolve_no_shuffle_ensembles(
498 &info.stream_job_fragments,
499 &info.upstream_fragment_downstreams,
500 )?;
501 let actors = render_actors(
502 &info.stream_job_fragments,
503 &self.database_info,
504 &info.definition,
505 &info.stream_job_fragments.inner.ctx,
506 &info.streaming_job_model,
507 partial_graph_manager
508 .control_stream_manager()
509 .env
510 .actor_id_generator(),
511 worker_nodes,
512 &ensembles,
513 &info.database_resource_group,
514 )?;
515 {
516 assert!(!self.state.is_paused());
517 let snapshot_epoch = barrier_info.prev_epoch();
518 for snapshot_backfill_epoch in snapshot_backfill_info
520 .upstream_mv_table_id_to_backfill_epoch
521 .values_mut()
522 {
523 assert_eq!(
524 snapshot_backfill_epoch.replace(snapshot_epoch),
525 None,
526 "must not set previously"
527 );
528 }
529 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
530 fill_snapshot_backfill_epoch(
531 &mut fragment.nodes,
532 Some(&snapshot_backfill_info),
533 &cross_db_snapshot_backfill_info,
534 )?;
535 }
536 let job_id = info.stream_job_fragments.stream_job_id();
537 let snapshot_backfill_upstream_tables = snapshot_backfill_info
538 .upstream_mv_table_id_to_backfill_epoch
539 .keys()
540 .cloned()
541 .collect();
542
543 let mut edges = self.database_info.build_edge(
545 Some((&info, true)),
546 None,
547 None,
548 partial_graph_manager.control_stream_manager(),
549 &actors.stream_actors,
550 &actors.actor_location,
551 );
552 let resolved_split_assignment = resolve_source_splits(
554 &info,
555 &actors,
556 edges.actor_new_no_shuffle(),
557 &self.database_info,
558 )?;
559
560 let Entry::Vacant(entry) =
561 self.independent_checkpoint_job_controls.entry(job_id)
562 else {
563 panic!("duplicated creating snapshot backfill job {job_id}");
564 };
565
566 let job = CreatingStreamingJobControl::new(
567 entry,
568 CreateSnapshotBackfillJobCommandInfo {
569 info: info.clone(),
570 snapshot_backfill_info: snapshot_backfill_info.clone(),
571 cross_db_snapshot_backfill_info,
572 resolved_split_assignment: resolved_split_assignment.clone(),
573 refresh_interval_sec: None,
574 },
575 take(notifiers),
576 snapshot_backfill_upstream_tables,
577 snapshot_epoch,
578 hummock_version_stats,
579 partial_graph_manager,
580 &mut edges,
581 &resolved_split_assignment,
582 &actors,
583 )?;
584
585 if let Some(fragment_infos) = job.fragment_infos() {
586 self.database_info.shared_actor_infos.upsert(
587 self.database_id,
588 fragment_infos.values().map(|f| (f, job_id)),
589 );
590 }
591
592 for upstream_mv_table_id in snapshot_backfill_info
593 .upstream_mv_table_id_to_backfill_epoch
594 .keys()
595 {
596 self.database_info.register_subscriber(
597 upstream_mv_table_id.as_job_id(),
598 info.streaming_job.id().as_subscriber_id(),
599 SubscriberType::SnapshotBackfill,
600 );
601 }
602
603 let mutation = Command::create_streaming_job_to_mutation(
604 &info,
605 &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
606 self.state.is_paused(),
607 &mut edges,
608 partial_graph_manager.control_stream_manager(),
609 None,
610 &resolved_split_assignment,
611 &actors.stream_actors,
612 &actors.actor_location,
613 )?;
614
615 let (table_ids, node_actors) = self.collect_base_info();
616 (
617 Some(mutation),
618 table_ids,
619 None,
620 node_actors,
621 PostCollectCommand::barrier(),
622 )
623 }
624 }
625 Some(Command::CreateStreamingJob {
626 mut info,
627 job_type: CreateStreamingJobType::BatchRefresh(mut batch_refresh_info),
628 cross_db_snapshot_backfill_info,
629 }) => {
630 {
631 if self.state.is_paused() {
632 bail!("cannot create batch refresh job while database barrier is paused");
633 }
634 let snapshot_epoch = barrier_info.prev_epoch();
635 let job_id = info.stream_job_fragments.stream_job_id();
636 let database_id = info.streaming_job.database_id();
637
638 let snapshot_backfill_info = &mut batch_refresh_info.snapshot_backfill_info;
640 for snapshot_backfill_epoch in snapshot_backfill_info
641 .upstream_mv_table_id_to_backfill_epoch
642 .values_mut()
643 {
644 assert_eq!(
645 snapshot_backfill_epoch.replace(snapshot_epoch),
646 None,
647 "must not set previously"
648 );
649 }
650 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
651 fill_snapshot_backfill_epoch(
652 &mut fragment.nodes,
653 Some(snapshot_backfill_info),
654 &cross_db_snapshot_backfill_info,
655 )?;
656 }
657 let snapshot_backfill_upstream_tables: HashSet<TableId> =
658 snapshot_backfill_info
659 .upstream_mv_table_id_to_backfill_epoch
660 .keys()
661 .cloned()
662 .collect();
663
664 let logical = BatchRefreshLogicalFragments {
666 fragments: info
667 .stream_job_fragments
668 .inner
669 .fragments
670 .iter()
671 .map(|(&fid, fragment)| {
672 (
673 fid,
674 LoadedFragment {
675 fragment_id: fid,
676 job_id,
677 fragment_type_mask: fragment.fragment_type_mask,
678 distribution_type: fragment.distribution_type.into(),
679 vnode_count: fragment.vnode_count(),
680 nodes: fragment.nodes.clone(),
681 state_table_ids: fragment
682 .state_table_ids
683 .iter()
684 .cloned()
685 .collect(),
686 parallelism: None,
687 },
688 )
689 })
690 .collect(),
691 downstreams: info.stream_job_fragments.downstreams.clone(),
692 };
693
694 assert!(
698 !self
699 .independent_checkpoint_job_controls
700 .contains_key(&job_id),
701 "duplicated creating batch refresh job {job_id}"
702 );
703
704 let snapshot_backfill_info_clone =
705 batch_refresh_info.snapshot_backfill_info.clone();
706 let refresh_interval_sec = batch_refresh_info.refresh_interval_sec;
707
708 let subscriber_id =
712 info.stream_job_fragments.stream_job_id().as_subscriber_id();
713 let mutation = Mutation::Add(AddMutation {
714 actor_dispatchers: Default::default(),
715 added_actors: Default::default(),
716 actor_splits: Default::default(),
717 pause: false,
718 subscriptions_to_add: snapshot_backfill_info_clone
719 .upstream_mv_table_id_to_backfill_epoch
720 .keys()
721 .map(|table_id| PbSubscriptionUpstreamInfo {
722 subscriber_id,
723 upstream_mv_table_id: *table_id,
724 })
725 .collect(),
726 backfill_nodes_to_pause: Default::default(),
727 actor_cdc_table_snapshot_splits: None,
728 new_upstream_sinks: Default::default(),
729 });
730
731 let job = BatchRefreshJobCheckpointControl::new(
732 database_id,
733 job_id,
734 CreateSnapshotBackfillJobCommandInfo {
735 info: info.clone(),
736 snapshot_backfill_info: snapshot_backfill_info_clone.clone(),
737 cross_db_snapshot_backfill_info,
738 resolved_split_assignment: Default::default(),
739 refresh_interval_sec: Some(refresh_interval_sec),
740 },
741 take(notifiers),
742 snapshot_backfill_upstream_tables,
743 snapshot_epoch,
744 hummock_version_stats,
745 partial_graph_manager,
746 &logical,
747 worker_nodes,
748 refresh_interval_sec,
749 )?;
750
751 if let Some(fragment_infos) = job.fragment_infos() {
752 self.database_info.shared_actor_infos.upsert(
753 self.database_id,
754 fragment_infos.values().map(|f| (f, job_id)),
755 );
756 }
757
758 self.independent_checkpoint_job_controls
759 .insert(job_id, IndependentCheckpointJobControl::BatchRefresh(job));
760
761 for upstream_mv_table_id in snapshot_backfill_info_clone
763 .upstream_mv_table_id_to_backfill_epoch
764 .keys()
765 {
766 self.database_info.register_subscriber(
767 upstream_mv_table_id.as_job_id(),
768 info.streaming_job.id().as_subscriber_id(),
769 SubscriberType::SnapshotBackfill,
770 );
771 }
772
773 let (table_ids, node_actors) = self.collect_base_info();
774 (
775 Some(mutation),
776 table_ids,
777 None,
778 node_actors,
779 PostCollectCommand::barrier(),
780 )
781 }
782 }
783 Some(Command::CreateStreamingJob {
784 mut info,
785 job_type,
786 cross_db_snapshot_backfill_info,
787 }) => {
788 let ensembles = resolve_no_shuffle_ensembles(
789 &info.stream_job_fragments,
790 &info.upstream_fragment_downstreams,
791 )?;
792 let actors = render_actors(
793 &info.stream_job_fragments,
794 &self.database_info,
795 &info.definition,
796 &info.stream_job_fragments.inner.ctx,
797 &info.streaming_job_model,
798 partial_graph_manager
799 .control_stream_manager()
800 .env
801 .actor_id_generator(),
802 worker_nodes,
803 &ensembles,
804 &info.database_resource_group,
805 )?;
806 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
807 fill_snapshot_backfill_epoch(
808 &mut fragment.nodes,
809 None,
810 &cross_db_snapshot_backfill_info,
811 )?;
812 }
813
814 let new_upstream_sink =
816 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
817 Some(ctx)
818 } else {
819 None
820 };
821
822 let mut edges = self.database_info.build_edge(
823 Some((&info, false)),
824 None,
825 new_upstream_sink,
826 partial_graph_manager.control_stream_manager(),
827 &actors.stream_actors,
828 &actors.actor_location,
829 );
830 let resolved_split_assignment = resolve_source_splits(
832 &info,
833 &actors,
834 edges.actor_new_no_shuffle(),
835 &self.database_info,
836 )?;
837
838 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
840 let (fragment, _) =
841 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
842 .expect("should have parallel cdc fragment");
843 Some(CdcTableBackfillTracker::new(
844 fragment.fragment_id,
845 splits.clone(),
846 ))
847 } else {
848 None
849 };
850 self.database_info
851 .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
852 self.database_info.pre_apply_new_fragments(
853 info.stream_job_fragments
854 .new_fragment_info(
855 &actors.stream_actors,
856 &actors.actor_location,
857 &resolved_split_assignment,
858 )
859 .map(|(fragment_id, fragment_infos)| {
860 (fragment_id, info.streaming_job.id(), fragment_infos)
861 }),
862 );
863 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
864 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
865 self.database_info.pre_apply_add_node_upstream(
866 downstream_fragment_id,
867 &PbUpstreamSinkInfo {
868 upstream_fragment_id: ctx.sink_fragment_id,
869 sink_output_schema: ctx.sink_output_fields.clone(),
870 project_exprs: ctx.project_exprs.clone(),
871 },
872 );
873 }
874
875 let (table_ids, node_actors) = self.collect_base_info();
876
877 let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
879 &info,
880 &mut edges,
881 &actors.stream_actors,
882 &actors.actor_location,
883 ));
884
885 let actor_cdc_table_snapshot_splits = self
887 .database_info
888 .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
889
890 let is_currently_paused = self.state.is_paused();
892 let mutation = Command::create_streaming_job_to_mutation(
893 &info,
894 &job_type,
895 is_currently_paused,
896 &mut edges,
897 partial_graph_manager.control_stream_manager(),
898 actor_cdc_table_snapshot_splits,
899 &resolved_split_assignment,
900 &actors.stream_actors,
901 &actors.actor_location,
902 )?;
903
904 (
905 Some(mutation),
906 table_ids,
907 actors_to_create,
908 node_actors,
909 PostCollectCommand::CreateStreamingJob {
910 info,
911 job_type,
912 cross_db_snapshot_backfill_info,
913 resolved_split_assignment,
914 },
915 )
916 }
917
918 Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
919
920 Some(Command::Pause) => {
921 let prev_is_paused = self.state.is_paused();
922 self.state.set_is_paused(true);
923 let mutation = Command::pause_to_mutation(prev_is_paused);
924 let (table_ids, node_actors) = self.collect_base_info();
925 (
926 mutation,
927 table_ids,
928 None,
929 node_actors,
930 PostCollectCommand::Command("Pause".to_owned()),
931 )
932 }
933
934 Some(Command::Resume) => {
935 let prev_is_paused = self.state.is_paused();
936 self.state.set_is_paused(false);
937 let mutation = Command::resume_to_mutation(prev_is_paused);
938 let (table_ids, node_actors) = self.collect_base_info();
939 (
940 mutation,
941 table_ids,
942 None,
943 node_actors,
944 PostCollectCommand::Command("Resume".to_owned()),
945 )
946 }
947
948 Some(Command::Throttle { jobs, config }) => {
949 let mutation = Some(Command::throttle_to_mutation(&config));
950 for (fragment_id, throttle_config) in &config {
951 self.database_info
952 .pre_apply_throttle(*fragment_id, throttle_config);
953 }
954 throttle_for_creating_jobs = Some((jobs, config));
955 self.apply_simple_command(mutation, "Throttle")
956 }
957
958 Some(Command::DropStreamingJobs {
959 streaming_job_ids,
960 unregistered_state_table_ids: _,
961 dropped_sink_fragment_by_targets,
962 }) => {
963 for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
965 self.database_info
966 .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
967 }
968
969 let (table_ids, node_actors) = self.collect_base_info();
970
971 let mut actors = Vec::new();
972 for job_id in streaming_job_ids {
973 let Some(job) = self.database_info.post_apply_remove_job(job_id) else {
974 warn!(
975 %job_id,
976 "skip drop payload for streaming job that has already been removed from barrier worker"
977 );
978 continue;
979 };
980
981 for fragment in job.fragment_infos.values() {
982 actors.extend(fragment.actors.keys().copied());
983 }
984 }
985
986 let mutation = Some(Command::drop_streaming_jobs_to_mutation(
987 &actors,
988 &dropped_sink_fragment_by_targets,
989 ));
990 (
991 mutation,
992 table_ids,
993 None,
994 node_actors,
995 PostCollectCommand::DropStreamingJobs,
996 )
997 }
998
999 Some(Command::RescheduleIntent {
1000 reschedule_plan, ..
1001 }) => {
1002 let ReschedulePlan {
1003 reschedules,
1004 fragment_actors,
1005 } = reschedule_plan
1006 .as_ref()
1007 .expect("reschedule intent should be resolved in global barrier worker");
1008
1009 for (fragment_id, reschedule) in reschedules {
1011 self.database_info.pre_apply_reschedule(
1012 *fragment_id,
1013 reschedule
1014 .added_actors
1015 .iter()
1016 .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
1017 actors.iter().map(|actor_id| {
1018 (
1019 *actor_id,
1020 InflightActorInfo {
1021 worker_id: *node_id,
1022 vnode_bitmap: reschedule
1023 .newly_created_actors
1024 .get(actor_id)
1025 .expect("should exist")
1026 .0
1027 .0
1028 .vnode_bitmap
1029 .clone(),
1030 splits: reschedule
1031 .actor_splits
1032 .get(actor_id)
1033 .cloned()
1034 .unwrap_or_default(),
1035 },
1036 )
1037 })
1038 })
1039 .collect(),
1040 reschedule
1041 .vnode_bitmap_updates
1042 .iter()
1043 .filter(|(actor_id, _)| {
1044 !reschedule.newly_created_actors.contains_key(*actor_id)
1045 })
1046 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
1047 .collect(),
1048 reschedule.actor_splits.clone(),
1049 );
1050 }
1051
1052 let (table_ids, node_actors) = self.collect_base_info();
1053
1054 let actors_to_create = Some(Command::reschedule_actors_to_create(
1056 reschedules,
1057 fragment_actors,
1058 &self.database_info,
1059 partial_graph_manager.control_stream_manager(),
1060 ));
1061
1062 self.database_info
1064 .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
1065 (
1066 *fragment_id,
1067 reschedule.removed_actors.iter().cloned().collect(),
1068 )
1069 }));
1070
1071 let mutation = Command::reschedule_to_mutation(
1073 reschedules,
1074 fragment_actors,
1075 partial_graph_manager.control_stream_manager(),
1076 &mut self.database_info,
1077 )?;
1078
1079 let reschedules = reschedule_plan
1080 .expect("reschedule intent should be resolved in global barrier worker")
1081 .reschedules;
1082 (
1083 mutation,
1084 table_ids,
1085 actors_to_create,
1086 node_actors,
1087 PostCollectCommand::Reschedule { reschedules },
1088 )
1089 }
1090
1091 Some(Command::ReplaceStreamJob(plan)) => {
1092 let ensembles = resolve_no_shuffle_ensembles(
1093 &plan.new_fragments,
1094 &plan.upstream_fragment_downstreams,
1095 )?;
1096 let mut render_result = render_actors(
1097 &plan.new_fragments,
1098 &self.database_info,
1099 "", &plan.new_fragments.inner.ctx,
1101 &plan.streaming_job_model,
1102 partial_graph_manager
1103 .control_stream_manager()
1104 .env
1105 .actor_id_generator(),
1106 worker_nodes,
1107 &ensembles,
1108 &plan.database_resource_group,
1109 )?;
1110
1111 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1114 let actor_id_counter = partial_graph_manager
1115 .control_stream_manager()
1116 .env
1117 .actor_id_generator();
1118 for sink_ctx in sinks {
1119 let original_fragment_id = sink_ctx.original_fragment.fragment_id;
1120 let original_frag_info = self.database_info.fragment(original_fragment_id);
1121 let actor_template = EnsembleActorTemplate::from_existing_inflight_fragment(
1122 original_frag_info,
1123 );
1124 let new_aligner = ComponentFragmentAligner::new_persistent(
1125 &actor_template,
1126 actor_id_counter,
1127 );
1128 let distribution_type: DistributionType =
1129 sink_ctx.new_fragment.distribution_type.into();
1130 let actor_assignments =
1131 new_aligner.align_component_actor(distribution_type);
1132 let new_fragment_id = sink_ctx.new_fragment.fragment_id;
1133 let mut actors = Vec::with_capacity(actor_assignments.len());
1134 for (&actor_id, (worker_id, vnode_bitmap)) in &actor_assignments {
1135 render_result.actor_location.insert(actor_id, *worker_id);
1136 actors.push(StreamActor {
1137 actor_id,
1138 fragment_id: new_fragment_id,
1139 vnode_bitmap: vnode_bitmap.clone(),
1140 mview_definition: String::new(),
1141 expr_context: Some(sink_ctx.ctx.to_expr_context()),
1142 config_override: sink_ctx.ctx.config_override.clone(),
1143 });
1144 }
1145 render_result.stream_actors.insert(new_fragment_id, actors);
1146 }
1147 }
1148
1149 let mut edges = self.database_info.build_edge(
1151 None,
1152 Some(&plan),
1153 None,
1154 partial_graph_manager.control_stream_manager(),
1155 &render_result.stream_actors,
1156 &render_result.actor_location,
1157 );
1158
1159 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
1161 .stream_actors
1162 .iter()
1163 .map(|(fragment_id, actors)| {
1164 (
1165 *fragment_id,
1166 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
1167 )
1168 })
1169 .collect();
1170 let resolved_split_assignment = match &plan.split_plan {
1171 ReplaceJobSplitPlan::Discovered(discovered) => {
1172 SourceManager::resolve_fragment_to_actor_splits(
1173 &plan.new_fragments,
1174 discovered,
1175 &fragment_actor_ids,
1176 )?
1177 }
1178 ReplaceJobSplitPlan::AlignFromPrevious => {
1179 SourceManager::resolve_replace_source_splits(
1180 &plan.new_fragments,
1181 &plan.replace_upstream,
1182 edges.actor_new_no_shuffle(),
1183 |_fragment_id, actor_id| {
1184 self.database_info.fragment_infos().find_map(|fragment| {
1185 fragment
1186 .actors
1187 .get(&actor_id)
1188 .map(|info| info.splits.clone())
1189 })
1190 },
1191 )?
1192 }
1193 };
1194
1195 self.database_info.pre_apply_new_fragments(
1197 plan.new_fragments
1198 .new_fragment_info(
1199 &render_result.stream_actors,
1200 &render_result.actor_location,
1201 &resolved_split_assignment,
1202 )
1203 .map(|(fragment_id, new_fragment)| {
1204 (fragment_id, plan.streaming_job.id(), new_fragment)
1205 }),
1206 );
1207 for (fragment_id, replace_map) in &plan.replace_upstream {
1208 self.database_info
1209 .pre_apply_replace_node_upstream(*fragment_id, replace_map);
1210 }
1211 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1212 self.database_info
1213 .pre_apply_new_fragments(sinks.iter().map(|sink| {
1214 (
1215 sink.new_fragment.fragment_id,
1216 sink.original_sink.id.as_job_id(),
1217 sink.new_fragment_info(
1218 &render_result.stream_actors,
1219 &render_result.actor_location,
1220 ),
1221 )
1222 }));
1223 }
1224
1225 let (table_ids, node_actors) = self.collect_base_info();
1226
1227 let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
1229 &plan,
1230 &mut edges,
1231 &self.database_info,
1232 &render_result.stream_actors,
1233 &render_result.actor_location,
1234 ));
1235
1236 let mutation = Command::replace_stream_job_to_mutation(
1239 &plan,
1240 &mut edges,
1241 &mut self.database_info,
1242 &resolved_split_assignment,
1243 )?;
1244
1245 {
1247 let mut fragment_ids_to_remove: Vec<_> = plan
1248 .old_fragments
1249 .fragments
1250 .values()
1251 .map(|f| f.fragment_id)
1252 .collect();
1253 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1254 fragment_ids_to_remove
1255 .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
1256 }
1257 self.database_info
1258 .post_apply_remove_fragments(fragment_ids_to_remove);
1259 }
1260
1261 (
1262 mutation,
1263 table_ids,
1264 actors_to_create,
1265 node_actors,
1266 PostCollectCommand::ReplaceStreamJob {
1267 plan,
1268 resolved_split_assignment,
1269 },
1270 )
1271 }
1272
1273 Some(Command::SourceChangeSplit(split_state)) => {
1274 self.database_info.pre_apply_split_assignments(
1276 split_state
1277 .split_assignment
1278 .iter()
1279 .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
1280 );
1281
1282 let mutation = Some(Command::source_change_split_to_mutation(
1283 &split_state.split_assignment,
1284 ));
1285 let (table_ids, node_actors) = self.collect_base_info();
1286 (
1287 mutation,
1288 table_ids,
1289 None,
1290 node_actors,
1291 PostCollectCommand::SourceChangeSplit {
1292 split_assignment: split_state.split_assignment,
1293 },
1294 )
1295 }
1296
1297 Some(Command::CreateSubscription {
1298 subscription_id,
1299 upstream_mv_table_id,
1300 retention_second,
1301 }) => {
1302 self.database_info.register_subscriber(
1303 upstream_mv_table_id.as_job_id(),
1304 subscription_id.as_subscriber_id(),
1305 SubscriberType::Subscription(retention_second),
1306 );
1307 let mutation = Some(Command::create_subscription_to_mutation(
1308 upstream_mv_table_id,
1309 subscription_id,
1310 ));
1311 let (table_ids, node_actors) = self.collect_base_info();
1312 (
1313 mutation,
1314 table_ids,
1315 None,
1316 node_actors,
1317 PostCollectCommand::CreateSubscription { subscription_id },
1318 )
1319 }
1320
1321 Some(Command::DropSubscription {
1322 subscription_id,
1323 upstream_mv_table_id,
1324 }) => {
1325 if self
1326 .database_info
1327 .unregister_subscriber(
1328 upstream_mv_table_id.as_job_id(),
1329 subscription_id.as_subscriber_id(),
1330 )
1331 .is_none()
1332 {
1333 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
1334 }
1335 let mutation = Some(Command::drop_subscription_to_mutation(
1336 upstream_mv_table_id,
1337 subscription_id,
1338 ));
1339 let (table_ids, node_actors) = self.collect_base_info();
1340 (
1341 mutation,
1342 table_ids,
1343 None,
1344 node_actors,
1345 PostCollectCommand::Command("DropSubscription".to_owned()),
1346 )
1347 }
1348
1349 Some(Command::AlterSubscriptionRetention {
1350 subscription_id,
1351 upstream_mv_table_id,
1352 retention_second,
1353 }) => {
1354 self.database_info.update_subscription_retention(
1355 upstream_mv_table_id.as_job_id(),
1356 subscription_id.as_subscriber_id(),
1357 retention_second,
1358 );
1359 self.apply_simple_command(None, "AlterSubscriptionRetention")
1360 }
1361
1362 Some(Command::ConnectorPropsChange(config)) => {
1363 let mutation = Some(Command::connector_props_change_to_mutation(&config));
1364 let (table_ids, node_actors) = self.collect_base_info();
1365 (
1366 mutation,
1367 table_ids,
1368 None,
1369 node_actors,
1370 PostCollectCommand::ConnectorPropsChange(config),
1371 )
1372 }
1373
1374 Some(Command::Refresh {
1375 table_id,
1376 associated_source_id,
1377 }) => {
1378 let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
1379 self.apply_simple_command(mutation, "Refresh")
1380 }
1381
1382 Some(Command::ListFinish {
1383 table_id: _,
1384 associated_source_id,
1385 }) => {
1386 let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
1387 self.apply_simple_command(mutation, "ListFinish")
1388 }
1389
1390 Some(Command::LoadFinish {
1391 table_id: _,
1392 associated_source_id,
1393 }) => {
1394 let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
1395 self.apply_simple_command(mutation, "LoadFinish")
1396 }
1397
1398 Some(Command::ResetSource { source_id }) => {
1399 let mutation = Some(Command::reset_source_to_mutation(source_id));
1400 self.apply_simple_command(mutation, "ResetSource")
1401 }
1402
1403 Some(Command::ResumeBackfill { target }) => {
1404 let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
1405 let (table_ids, node_actors) = self.collect_base_info();
1406 (
1407 mutation,
1408 table_ids,
1409 None,
1410 node_actors,
1411 PostCollectCommand::ResumeBackfill { target },
1412 )
1413 }
1414
1415 Some(Command::InjectSourceOffsets {
1416 source_id,
1417 split_offsets,
1418 }) => {
1419 let mutation = Some(Command::inject_source_offsets_to_mutation(
1420 source_id,
1421 &split_offsets,
1422 ));
1423 self.apply_simple_command(mutation, "InjectSourceOffsets")
1424 }
1425 };
1426
1427 let mut finished_snapshot_backfill_jobs = HashSet::new();
1428 let mutation = match mutation {
1429 Some(mutation) => Some(mutation),
1430 None => {
1431 let mut finished_snapshot_backfill_job_info = HashMap::new();
1432 if barrier_info.kind.is_checkpoint() {
1433 for (&job_id, job) in &mut self.independent_checkpoint_job_controls {
1434 if let IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) =
1435 job
1436 && creating_job.should_merge_to_upstream()
1437 {
1438 let info = creating_job
1439 .start_consume_upstream(partial_graph_manager, &barrier_info)?;
1440 finished_snapshot_backfill_job_info
1441 .try_insert(job_id, info)
1442 .expect("non-duplicated");
1443 }
1444 }
1445 }
1446
1447 if !finished_snapshot_backfill_job_info.is_empty() {
1448 let actors_to_create = actors_to_create.get_or_insert_default();
1449 let mut subscriptions_to_drop = vec![];
1450 let mut dispatcher_update = vec![];
1451 let mut actor_splits = HashMap::new();
1452 for (job_id, info) in finished_snapshot_backfill_job_info {
1453 finished_snapshot_backfill_jobs.insert(job_id);
1454 subscriptions_to_drop.extend(
1455 info.snapshot_backfill_upstream_tables.iter().map(
1456 |upstream_table_id| PbSubscriptionUpstreamInfo {
1457 subscriber_id: job_id.as_subscriber_id(),
1458 upstream_mv_table_id: *upstream_table_id,
1459 },
1460 ),
1461 );
1462 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
1463 assert_matches!(
1464 self.database_info.unregister_subscriber(
1465 upstream_mv_table_id.as_job_id(),
1466 job_id.as_subscriber_id()
1467 ),
1468 Some(SubscriberType::SnapshotBackfill)
1469 );
1470 }
1471
1472 table_ids_to_commit.extend(
1473 info.fragment_infos
1474 .values()
1475 .flat_map(|fragment| fragment.state_table_ids.iter())
1476 .copied(),
1477 );
1478
1479 let actor_len = info
1480 .fragment_infos
1481 .values()
1482 .map(|fragment| fragment.actors.len() as u64)
1483 .sum();
1484 let id_gen = GlobalActorIdGen::new(
1485 partial_graph_manager
1486 .control_stream_manager()
1487 .env
1488 .actor_id_generator(),
1489 actor_len,
1490 );
1491 let mut next_local_actor_id = 0;
1492 let actor_mapping: HashMap<_, _> = info
1494 .fragment_infos
1495 .values()
1496 .flat_map(|fragment| fragment.actors.keys())
1497 .map(|old_actor_id| {
1498 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
1499 next_local_actor_id += 1;
1500 (*old_actor_id, new_actor_id.as_global_id())
1501 })
1502 .collect();
1503 let actor_mapping = &actor_mapping;
1504 let new_stream_actors: HashMap<_, _> = info
1505 .stream_actors
1506 .into_iter()
1507 .map(|(old_actor_id, mut actor)| {
1508 let new_actor_id = actor_mapping[&old_actor_id];
1509 actor.actor_id = new_actor_id;
1510 (new_actor_id, actor)
1511 })
1512 .collect();
1513 let new_fragment_info: HashMap<_, _> = info
1514 .fragment_infos
1515 .into_iter()
1516 .map(|(fragment_id, mut fragment)| {
1517 let actors = take(&mut fragment.actors);
1518 fragment.actors = actors
1519 .into_iter()
1520 .map(|(old_actor_id, actor)| {
1521 let new_actor_id = actor_mapping[&old_actor_id];
1522 (new_actor_id, actor)
1523 })
1524 .collect();
1525 (fragment_id, fragment)
1526 })
1527 .collect();
1528 actor_splits.extend(
1529 new_fragment_info
1530 .values()
1531 .flat_map(|fragment| &fragment.actors)
1532 .map(|(actor_id, actor)| {
1533 (
1534 *actor_id,
1535 ConnectorSplits {
1536 splits: actor
1537 .splits
1538 .iter()
1539 .map(ConnectorSplit::from)
1540 .collect(),
1541 },
1542 )
1543 }),
1544 );
1545 let partial_graph_id = to_partial_graph_id(self.database_id, None);
1547 let mut edge_builder = FragmentEdgeBuilder::new(
1548 info.upstream_fragment_downstreams
1549 .keys()
1550 .map(|upstream_fragment_id| {
1551 self.database_info.fragment(*upstream_fragment_id)
1552 })
1553 .chain(new_fragment_info.values())
1554 .map(|fragment| {
1555 (
1556 fragment.fragment_id,
1557 EdgeBuilderFragmentInfo::from_inflight(
1558 fragment,
1559 partial_graph_id,
1560 partial_graph_manager.control_stream_manager(),
1561 ),
1562 )
1563 }),
1564 );
1565 edge_builder.add_relations(&info.upstream_fragment_downstreams);
1566 edge_builder.add_relations(&info.downstreams);
1567 let mut edges = edge_builder.build();
1568 let new_actors_to_create = edges.collect_actors_to_create(
1569 new_fragment_info.values().map(|fragment| {
1570 (
1571 fragment.fragment_id,
1572 &fragment.nodes,
1573 fragment.actors.iter().map(|(actor_id, actor)| {
1574 (&new_stream_actors[actor_id], actor.worker_id)
1575 }),
1576 [], )
1578 }),
1579 );
1580 dispatcher_update.extend(
1581 info.upstream_fragment_downstreams.keys().flat_map(
1582 |upstream_fragment_id| {
1583 let new_actor_dispatchers = edges
1584 .dispatchers
1585 .remove(upstream_fragment_id)
1586 .expect("should exist");
1587 new_actor_dispatchers.into_iter().flat_map(
1588 |(upstream_actor_id, dispatchers)| {
1589 dispatchers.into_iter().map(move |dispatcher| {
1590 PbDispatcherUpdate {
1591 actor_id: upstream_actor_id,
1592 dispatcher_id: dispatcher.dispatcher_id,
1593 hash_mapping: dispatcher.hash_mapping,
1594 removed_downstream_actor_id: dispatcher
1595 .downstream_actor_id
1596 .iter()
1597 .map(|new_downstream_actor_id| {
1598 actor_mapping
1599 .iter()
1600 .find_map(
1601 |(old_actor_id, new_actor_id)| {
1602 (new_downstream_actor_id
1603 == new_actor_id)
1604 .then_some(*old_actor_id)
1605 },
1606 )
1607 .expect("should exist")
1608 })
1609 .collect(),
1610 added_downstream_actor_id: dispatcher
1611 .downstream_actor_id,
1612 }
1613 })
1614 },
1615 )
1616 },
1617 ),
1618 );
1619 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1620 for (worker_id, worker_actors) in new_actors_to_create {
1621 node_actors.entry(worker_id).or_default().extend(
1622 worker_actors.values().flat_map(|(_, actors, _)| {
1623 actors.iter().map(|(actor, _, _)| actor.actor_id)
1624 }),
1625 );
1626 actors_to_create
1627 .entry(worker_id)
1628 .or_default()
1629 .extend(worker_actors);
1630 }
1631 self.database_info.add_existing(InflightStreamingJobInfo {
1632 job_id,
1633 fragment_infos: new_fragment_info,
1634 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
1636 cdc_table_backfill_tracker: None, });
1638 }
1639
1640 Some(PbMutation::Update(PbUpdateMutation {
1641 dispatcher_update,
1642 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
1646 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
1650 }))
1651 } else {
1652 let fragment_ids = self.database_info.take_pending_backfill_nodes();
1653 if fragment_ids.is_empty() {
1654 None
1655 } else {
1656 Some(PbMutation::StartFragmentBackfill(
1657 PbStartFragmentBackfillMutation { fragment_ids },
1658 ))
1659 }
1660 }
1661 }
1662 };
1663
1664 for (job_id, job) in &mut self.independent_checkpoint_job_controls {
1666 match job {
1667 IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) => {
1668 if !finished_snapshot_backfill_jobs.contains(job_id) {
1669 let throttle_mutation = if let Some((ref jobs, ref config)) =
1670 throttle_for_creating_jobs
1671 && jobs.contains(job_id)
1672 {
1673 assert_eq!(
1674 jobs.len(),
1675 1,
1676 "should not alter rate limit of snapshot backfill job with other jobs"
1677 );
1678 Some((
1679 Mutation::Throttle(ThrottleMutation {
1680 fragment_throttle: config
1681 .iter()
1682 .map(|(fragment_id, config)| (*fragment_id, *config))
1683 .collect(),
1684 }),
1685 take(notifiers),
1686 ))
1687 } else {
1688 None
1689 };
1690 creating_job.on_new_upstream_barrier(
1691 partial_graph_manager,
1692 &barrier_info,
1693 throttle_mutation,
1694 )?;
1695 }
1696 }
1697 IndependentCheckpointJobControl::BatchRefresh(batch_refresh_job) => {
1698 batch_refresh_job.on_new_upstream_barrier(
1699 partial_graph_manager,
1700 &barrier_info,
1701 None, )?;
1703 }
1704 }
1705 }
1706
1707 partial_graph_manager.inject_barrier(
1708 to_partial_graph_id(self.database_id, None),
1709 mutation,
1710 &node_actors,
1711 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1712 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1713 actors_to_create,
1714 PartialGraphBarrierInfo::new(
1715 post_collect_command,
1716 barrier_info,
1717 take(notifiers),
1718 table_ids_to_commit,
1719 ),
1720 )?;
1721
1722 Ok(ApplyCommandInfo {
1723 jobs_to_wait: finished_snapshot_backfill_jobs,
1724 })
1725 }
1726}