1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19
20use risingwave_common::bail;
21use risingwave_common::catalog::TableId;
22use risingwave_common::id::JobId;
23use risingwave_common::util::epoch::Epoch;
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
27use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
28use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
29use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
30use risingwave_pb::stream_plan::{
31 PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
32 PbUpstreamSinkInfo, ThrottleMutation,
33};
34use tracing::warn;
35
36use crate::MetaResult;
37use crate::barrier::cdc_progress::CdcTableBackfillTracker;
38use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
39use crate::barrier::command::{PostCollectCommand, ReschedulePlan};
40use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
41use crate::barrier::edge_builder::FragmentEdgeBuilder;
42use crate::barrier::info::{
43 BarrierInfo, CreateStreamingJobStatus, InflightStreamingJobInfo, SubscriberType,
44};
45use crate::barrier::notifier::Notifier;
46use crate::barrier::partial_graph::PartialGraphManager;
47use crate::barrier::rpc::to_partial_graph_id;
48use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
49use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
50use crate::model::{ActorId, FragmentId, StreamJobActorsToCreate};
51use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
52use crate::stream::{GlobalActorIdGen, fill_snapshot_backfill_epoch};
53
54pub(in crate::barrier) struct BarrierWorkerState {
56 in_flight_prev_epoch: TracedEpoch,
61
62 pending_non_checkpoint_barriers: Vec<u64>,
64
65 is_paused: bool,
67}
68
69impl BarrierWorkerState {
70 pub(super) fn new() -> Self {
71 Self {
72 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
73 pending_non_checkpoint_barriers: vec![],
74 is_paused: false,
75 }
76 }
77
78 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
79 Self {
80 in_flight_prev_epoch,
81 pending_non_checkpoint_barriers: vec![],
82 is_paused,
83 }
84 }
85
86 pub fn is_paused(&self) -> bool {
87 self.is_paused
88 }
89
90 fn set_is_paused(&mut self, is_paused: bool) {
91 if self.is_paused != is_paused {
92 tracing::info!(
93 currently_paused = self.is_paused,
94 newly_paused = is_paused,
95 "update paused state"
96 );
97 self.is_paused = is_paused;
98 }
99 }
100
101 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
102 &self.in_flight_prev_epoch
103 }
104
105 pub fn next_barrier_info(
107 &mut self,
108 is_checkpoint: bool,
109 curr_epoch: TracedEpoch,
110 ) -> BarrierInfo {
111 assert!(
112 self.in_flight_prev_epoch.value() < curr_epoch.value(),
113 "curr epoch regress. {} > {}",
114 self.in_flight_prev_epoch.value(),
115 curr_epoch.value()
116 );
117 let prev_epoch = self.in_flight_prev_epoch.clone();
118 self.in_flight_prev_epoch = curr_epoch.clone();
119 self.pending_non_checkpoint_barriers
120 .push(prev_epoch.value().0);
121 let kind = if is_checkpoint {
122 let epochs = take(&mut self.pending_non_checkpoint_barriers);
123 BarrierKind::Checkpoint(epochs)
124 } else {
125 BarrierKind::Barrier
126 };
127 BarrierInfo {
128 prev_epoch,
129 curr_epoch,
130 kind,
131 }
132 }
133}
134
135pub(super) struct ApplyCommandInfo {
136 pub mv_subscription_max_retention: HashMap<TableId, u64>,
137 pub table_ids_to_commit: HashSet<TableId>,
138 pub jobs_to_wait: HashSet<JobId>,
139 pub command: PostCollectCommand,
140}
141
142type ApplyCommandResult = (
145 Option<Mutation>,
146 HashSet<TableId>,
147 Option<StreamJobActorsToCreate>,
148 HashMap<WorkerId, HashSet<ActorId>>,
149 PostCollectCommand,
150);
151
152impl DatabaseCheckpointControl {
153 fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
155 let table_ids_to_commit = self.database_info.existing_table_ids().collect();
156 let node_actors =
157 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
158 (table_ids_to_commit, node_actors)
159 }
160
161 fn apply_simple_command(
165 &self,
166 mutation: Option<Mutation>,
167 command_name: &'static str,
168 ) -> ApplyCommandResult {
169 let (table_ids, node_actors) = self.collect_base_info();
170 (
171 mutation,
172 table_ids,
173 None,
174 node_actors,
175 PostCollectCommand::Command(command_name.to_owned()),
176 )
177 }
178
179 pub(super) fn apply_command(
182 &mut self,
183 command: Option<Command>,
184 notifiers: &mut Vec<Notifier>,
185 barrier_info: &BarrierInfo,
186 partial_graph_manager: &mut PartialGraphManager,
187 hummock_version_stats: &HummockVersionStats,
188 ) -> MetaResult<ApplyCommandInfo> {
189 debug_assert!(
190 !matches!(
191 command,
192 Some(Command::RescheduleIntent {
193 reschedule_plan: None,
194 ..
195 })
196 ),
197 "reschedule intent must be resolved before apply"
198 );
199 if matches!(
200 command,
201 Some(Command::RescheduleIntent {
202 reschedule_plan: None,
203 ..
204 })
205 ) {
206 bail!("reschedule intent must be resolved before apply");
207 }
208 let mut throttle_for_creating_jobs: Option<(
210 HashSet<JobId>,
211 HashMap<FragmentId, ThrottleConfig>,
212 )> = None;
213
214 let (
218 mutation,
219 mut table_ids_to_commit,
220 mut actors_to_create,
221 mut node_actors,
222 post_collect_command,
223 ) = match command {
224 None => self.apply_simple_command(None, "barrier"),
225 Some(Command::CreateStreamingJob {
226 mut info,
227 job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
228 cross_db_snapshot_backfill_info,
229 }) => {
230 {
231 assert!(!self.state.is_paused());
232 let snapshot_epoch = barrier_info.prev_epoch();
233 for snapshot_backfill_epoch in snapshot_backfill_info
235 .upstream_mv_table_id_to_backfill_epoch
236 .values_mut()
237 {
238 assert_eq!(
239 snapshot_backfill_epoch.replace(snapshot_epoch),
240 None,
241 "must not set previously"
242 );
243 }
244 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
245 fill_snapshot_backfill_epoch(
246 &mut fragment.nodes,
247 Some(&snapshot_backfill_info),
248 &cross_db_snapshot_backfill_info,
249 )?;
250 }
251 let job_id = info.stream_job_fragments.stream_job_id();
252 let snapshot_backfill_upstream_tables = snapshot_backfill_info
253 .upstream_mv_table_id_to_backfill_epoch
254 .keys()
255 .cloned()
256 .collect();
257
258 let mut edges = self.database_info.build_edge(
259 Some((&info, true)),
260 None,
261 None,
262 partial_graph_manager.control_stream_manager(),
263 );
264
265 let Entry::Vacant(entry) = self.creating_streaming_job_controls.entry(job_id)
266 else {
267 panic!("duplicated creating snapshot backfill job {job_id}");
268 };
269
270 let job = CreatingStreamingJobControl::new(
271 entry,
272 CreateSnapshotBackfillJobCommandInfo {
273 info: info.clone(),
274 snapshot_backfill_info: snapshot_backfill_info.clone(),
275 cross_db_snapshot_backfill_info,
276 },
277 take(notifiers),
278 snapshot_backfill_upstream_tables,
279 snapshot_epoch,
280 hummock_version_stats,
281 partial_graph_manager,
282 &mut edges,
283 )?;
284
285 self.database_info
286 .shared_actor_infos
287 .upsert(self.database_id, job.fragment_infos_with_job_id());
288
289 for upstream_mv_table_id in snapshot_backfill_info
290 .upstream_mv_table_id_to_backfill_epoch
291 .keys()
292 {
293 self.database_info.register_subscriber(
294 upstream_mv_table_id.as_job_id(),
295 info.streaming_job.id().as_subscriber_id(),
296 SubscriberType::SnapshotBackfill,
297 );
298 }
299
300 let mutation = Command::create_streaming_job_to_mutation(
301 &info,
302 &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
303 self.state.is_paused(),
304 &mut edges,
305 partial_graph_manager.control_stream_manager(),
306 None,
307 )?;
308
309 let (table_ids, node_actors) = self.collect_base_info();
310 (
311 Some(mutation),
312 table_ids,
313 None,
314 node_actors,
315 PostCollectCommand::barrier(),
316 )
317 }
318 }
319 Some(Command::CreateStreamingJob {
320 mut info,
321 job_type,
322 cross_db_snapshot_backfill_info,
323 }) => {
324 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
325 fill_snapshot_backfill_epoch(
326 &mut fragment.nodes,
327 None,
328 &cross_db_snapshot_backfill_info,
329 )?;
330 }
331
332 let new_upstream_sink =
334 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
335 Some(ctx)
336 } else {
337 None
338 };
339 let mut edges = self.database_info.build_edge(
340 Some((&info, false)),
341 None,
342 new_upstream_sink,
343 partial_graph_manager.control_stream_manager(),
344 );
345
346 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
348 let (fragment, _) =
349 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
350 .expect("should have parallel cdc fragment");
351 Some(CdcTableBackfillTracker::new(
352 fragment.fragment_id,
353 splits.clone(),
354 ))
355 } else {
356 None
357 };
358 self.database_info
359 .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
360 self.database_info.pre_apply_new_fragments(
361 info.stream_job_fragments
362 .new_fragment_info(&info.init_split_assignment)
363 .map(|(fragment_id, fragment_infos)| {
364 (fragment_id, info.streaming_job.id(), fragment_infos)
365 }),
366 );
367 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
368 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
369 self.database_info.pre_apply_add_node_upstream(
370 downstream_fragment_id,
371 &PbUpstreamSinkInfo {
372 upstream_fragment_id: ctx.sink_fragment_id,
373 sink_output_schema: ctx.sink_output_fields.clone(),
374 project_exprs: ctx.project_exprs.clone(),
375 },
376 );
377 }
378
379 let (table_ids, node_actors) = self.collect_base_info();
380
381 let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
383 &info, &mut edges,
384 ));
385
386 let actor_cdc_table_snapshot_splits = self
388 .database_info
389 .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
390
391 let is_currently_paused = self.state.is_paused();
393 let mutation = Command::create_streaming_job_to_mutation(
394 &info,
395 &job_type,
396 is_currently_paused,
397 &mut edges,
398 partial_graph_manager.control_stream_manager(),
399 actor_cdc_table_snapshot_splits,
400 )?;
401
402 (
403 Some(mutation),
404 table_ids,
405 actors_to_create,
406 node_actors,
407 PostCollectCommand::CreateStreamingJob {
408 info,
409 job_type,
410 cross_db_snapshot_backfill_info,
411 },
412 )
413 }
414
415 Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
416
417 Some(Command::Pause) => {
418 let prev_is_paused = self.state.is_paused();
419 self.state.set_is_paused(true);
420 let mutation = Command::pause_to_mutation(prev_is_paused);
421 let (table_ids, node_actors) = self.collect_base_info();
422 (
423 mutation,
424 table_ids,
425 None,
426 node_actors,
427 PostCollectCommand::Command("Pause".to_owned()),
428 )
429 }
430
431 Some(Command::Resume) => {
432 let prev_is_paused = self.state.is_paused();
433 self.state.set_is_paused(false);
434 let mutation = Command::resume_to_mutation(prev_is_paused);
435 let (table_ids, node_actors) = self.collect_base_info();
436 (
437 mutation,
438 table_ids,
439 None,
440 node_actors,
441 PostCollectCommand::Command("Resume".to_owned()),
442 )
443 }
444
445 Some(Command::Throttle { jobs, config }) => {
446 let mutation = Some(Command::throttle_to_mutation(&config));
447 throttle_for_creating_jobs = Some((jobs, config));
448 self.apply_simple_command(mutation, "Throttle")
449 }
450
451 Some(Command::DropStreamingJobs {
452 streaming_job_ids,
453 actors,
454 unregistered_state_table_ids,
455 unregistered_fragment_ids,
456 dropped_sink_fragment_by_targets,
457 }) => {
458 for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
460 self.database_info
461 .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
462 }
463
464 let (table_ids, node_actors) = self.collect_base_info();
465
466 self.database_info
468 .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
469
470 let mutation = Some(Command::drop_streaming_jobs_to_mutation(
471 &actors,
472 &dropped_sink_fragment_by_targets,
473 ));
474 (
475 mutation,
476 table_ids,
477 None,
478 node_actors,
479 PostCollectCommand::DropStreamingJobs {
480 streaming_job_ids,
481 unregistered_state_table_ids,
482 },
483 )
484 }
485
486 Some(Command::RescheduleIntent {
487 reschedule_plan, ..
488 }) => {
489 let ReschedulePlan {
490 reschedules,
491 fragment_actors,
492 } = reschedule_plan
493 .as_ref()
494 .expect("reschedule intent should be resolved in global barrier worker");
495
496 for (fragment_id, reschedule) in reschedules {
498 self.database_info.pre_apply_reschedule(
499 *fragment_id,
500 reschedule
501 .added_actors
502 .iter()
503 .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
504 actors.iter().map(|actor_id| {
505 (
506 *actor_id,
507 InflightActorInfo {
508 worker_id: *node_id,
509 vnode_bitmap: reschedule
510 .newly_created_actors
511 .get(actor_id)
512 .expect("should exist")
513 .0
514 .0
515 .vnode_bitmap
516 .clone(),
517 splits: reschedule
518 .actor_splits
519 .get(actor_id)
520 .cloned()
521 .unwrap_or_default(),
522 },
523 )
524 })
525 })
526 .collect(),
527 reschedule
528 .vnode_bitmap_updates
529 .iter()
530 .filter(|(actor_id, _)| {
531 !reschedule.newly_created_actors.contains_key(*actor_id)
532 })
533 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
534 .collect(),
535 reschedule.actor_splits.clone(),
536 );
537 }
538
539 let (table_ids, node_actors) = self.collect_base_info();
540
541 let actors_to_create = Some(Command::reschedule_actors_to_create(
543 reschedules,
544 fragment_actors,
545 &self.database_info,
546 partial_graph_manager.control_stream_manager(),
547 ));
548
549 self.database_info
551 .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
552 (
553 *fragment_id,
554 reschedule.removed_actors.iter().cloned().collect(),
555 )
556 }));
557
558 let mutation = Command::reschedule_to_mutation(
560 reschedules,
561 fragment_actors,
562 partial_graph_manager.control_stream_manager(),
563 &mut self.database_info,
564 )?;
565
566 let reschedules = reschedule_plan
567 .expect("reschedule intent should be resolved in global barrier worker")
568 .reschedules;
569 (
570 mutation,
571 table_ids,
572 actors_to_create,
573 node_actors,
574 PostCollectCommand::Reschedule { reschedules },
575 )
576 }
577
578 Some(Command::ReplaceStreamJob(plan)) => {
579 let mut edges = self.database_info.build_edge(
581 None,
582 Some(&plan),
583 None,
584 partial_graph_manager.control_stream_manager(),
585 );
586
587 self.database_info.pre_apply_new_fragments(
589 plan.new_fragments
590 .new_fragment_info(&plan.init_split_assignment)
591 .map(|(fragment_id, new_fragment)| {
592 (fragment_id, plan.streaming_job.id(), new_fragment)
593 }),
594 );
595 for (fragment_id, replace_map) in &plan.replace_upstream {
596 self.database_info
597 .pre_apply_replace_node_upstream(*fragment_id, replace_map);
598 }
599 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
600 self.database_info
601 .pre_apply_new_fragments(sinks.iter().map(|sink| {
602 (
603 sink.new_fragment.fragment_id,
604 sink.original_sink.id.as_job_id(),
605 sink.new_fragment_info(),
606 )
607 }));
608 }
609
610 let (table_ids, node_actors) = self.collect_base_info();
611
612 let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
614 &plan,
615 &mut edges,
616 &self.database_info,
617 ));
618
619 {
621 let mut fragment_ids_to_remove: Vec<_> = plan
622 .old_fragments
623 .fragments
624 .values()
625 .map(|f| f.fragment_id)
626 .collect();
627 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
628 fragment_ids_to_remove
629 .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
630 }
631 self.database_info
632 .post_apply_remove_fragments(fragment_ids_to_remove);
633 }
634
635 let mutation = Command::replace_stream_job_to_mutation(
637 &plan,
638 &mut edges,
639 &mut self.database_info,
640 )?;
641
642 (
643 mutation,
644 table_ids,
645 actors_to_create,
646 node_actors,
647 PostCollectCommand::ReplaceStreamJob(plan),
648 )
649 }
650
651 Some(Command::SourceChangeSplit(split_state)) => {
652 self.database_info.pre_apply_split_assignments(
654 split_state
655 .split_assignment
656 .iter()
657 .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
658 );
659
660 let mutation = Some(Command::source_change_split_to_mutation(
661 &split_state.split_assignment,
662 ));
663 let (table_ids, node_actors) = self.collect_base_info();
664 (
665 mutation,
666 table_ids,
667 None,
668 node_actors,
669 PostCollectCommand::SourceChangeSplit {
670 split_assignment: split_state.split_assignment,
671 },
672 )
673 }
674
675 Some(Command::CreateSubscription {
676 subscription_id,
677 upstream_mv_table_id,
678 retention_second,
679 }) => {
680 self.database_info.register_subscriber(
681 upstream_mv_table_id.as_job_id(),
682 subscription_id.as_subscriber_id(),
683 SubscriberType::Subscription(retention_second),
684 );
685 let mutation = Some(Command::create_subscription_to_mutation(
686 upstream_mv_table_id,
687 subscription_id,
688 ));
689 let (table_ids, node_actors) = self.collect_base_info();
690 (
691 mutation,
692 table_ids,
693 None,
694 node_actors,
695 PostCollectCommand::CreateSubscription { subscription_id },
696 )
697 }
698
699 Some(Command::DropSubscription {
700 subscription_id,
701 upstream_mv_table_id,
702 }) => {
703 if self
704 .database_info
705 .unregister_subscriber(
706 upstream_mv_table_id.as_job_id(),
707 subscription_id.as_subscriber_id(),
708 )
709 .is_none()
710 {
711 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
712 }
713 let mutation = Some(Command::drop_subscription_to_mutation(
714 upstream_mv_table_id,
715 subscription_id,
716 ));
717 let (table_ids, node_actors) = self.collect_base_info();
718 (
719 mutation,
720 table_ids,
721 None,
722 node_actors,
723 PostCollectCommand::Command("DropSubscription".to_owned()),
724 )
725 }
726
727 Some(Command::AlterSubscriptionRetention {
728 subscription_id,
729 upstream_mv_table_id,
730 retention_second,
731 }) => {
732 self.database_info.update_subscription_retention(
733 upstream_mv_table_id.as_job_id(),
734 subscription_id.as_subscriber_id(),
735 retention_second,
736 );
737 self.apply_simple_command(None, "AlterSubscriptionRetention")
738 }
739
740 Some(Command::ConnectorPropsChange(config)) => {
741 let mutation = Some(Command::connector_props_change_to_mutation(&config));
742 let (table_ids, node_actors) = self.collect_base_info();
743 (
744 mutation,
745 table_ids,
746 None,
747 node_actors,
748 PostCollectCommand::ConnectorPropsChange(config),
749 )
750 }
751
752 Some(Command::Refresh {
753 table_id,
754 associated_source_id,
755 }) => {
756 let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
757 self.apply_simple_command(mutation, "Refresh")
758 }
759
760 Some(Command::ListFinish {
761 table_id: _,
762 associated_source_id,
763 }) => {
764 let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
765 self.apply_simple_command(mutation, "ListFinish")
766 }
767
768 Some(Command::LoadFinish {
769 table_id: _,
770 associated_source_id,
771 }) => {
772 let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
773 self.apply_simple_command(mutation, "LoadFinish")
774 }
775
776 Some(Command::ResetSource { source_id }) => {
777 let mutation = Some(Command::reset_source_to_mutation(source_id));
778 self.apply_simple_command(mutation, "ResetSource")
779 }
780
781 Some(Command::ResumeBackfill { target }) => {
782 let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
783 let (table_ids, node_actors) = self.collect_base_info();
784 (
785 mutation,
786 table_ids,
787 None,
788 node_actors,
789 PostCollectCommand::ResumeBackfill { target },
790 )
791 }
792
793 Some(Command::InjectSourceOffsets {
794 source_id,
795 split_offsets,
796 }) => {
797 let mutation = Some(Command::inject_source_offsets_to_mutation(
798 source_id,
799 &split_offsets,
800 ));
801 self.apply_simple_command(mutation, "InjectSourceOffsets")
802 }
803 };
804
805 let mut finished_snapshot_backfill_jobs = HashSet::new();
806 let mutation = match mutation {
807 Some(mutation) => Some(mutation),
808 None => {
809 let mut finished_snapshot_backfill_job_info = HashMap::new();
810 if barrier_info.kind.is_checkpoint() {
811 for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
812 if creating_job.should_merge_to_upstream() {
813 let info = creating_job
814 .start_consume_upstream(partial_graph_manager, barrier_info)?;
815 finished_snapshot_backfill_job_info
816 .try_insert(job_id, info)
817 .expect("non-duplicated");
818 }
819 }
820 }
821
822 if !finished_snapshot_backfill_job_info.is_empty() {
823 let actors_to_create = actors_to_create.get_or_insert_default();
824 let mut subscriptions_to_drop = vec![];
825 let mut dispatcher_update = vec![];
826 let mut actor_splits = HashMap::new();
827 for (job_id, info) in finished_snapshot_backfill_job_info {
828 finished_snapshot_backfill_jobs.insert(job_id);
829 subscriptions_to_drop.extend(
830 info.snapshot_backfill_upstream_tables.iter().map(
831 |upstream_table_id| PbSubscriptionUpstreamInfo {
832 subscriber_id: job_id.as_subscriber_id(),
833 upstream_mv_table_id: *upstream_table_id,
834 },
835 ),
836 );
837 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
838 assert_matches!(
839 self.database_info.unregister_subscriber(
840 upstream_mv_table_id.as_job_id(),
841 job_id.as_subscriber_id()
842 ),
843 Some(SubscriberType::SnapshotBackfill)
844 );
845 }
846
847 table_ids_to_commit.extend(
848 info.fragment_infos
849 .values()
850 .flat_map(|fragment| fragment.state_table_ids.iter())
851 .copied(),
852 );
853
854 let actor_len = info
855 .fragment_infos
856 .values()
857 .map(|fragment| fragment.actors.len() as u64)
858 .sum();
859 let id_gen = GlobalActorIdGen::new(
860 partial_graph_manager
861 .control_stream_manager()
862 .env
863 .actor_id_generator(),
864 actor_len,
865 );
866 let mut next_local_actor_id = 0;
867 let actor_mapping: HashMap<_, _> = info
869 .fragment_infos
870 .values()
871 .flat_map(|fragment| fragment.actors.keys())
872 .map(|old_actor_id| {
873 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
874 next_local_actor_id += 1;
875 (*old_actor_id, new_actor_id.as_global_id())
876 })
877 .collect();
878 let actor_mapping = &actor_mapping;
879 let new_stream_actors: HashMap<_, _> = info
880 .stream_actors
881 .into_iter()
882 .map(|(old_actor_id, mut actor)| {
883 let new_actor_id = actor_mapping[&old_actor_id];
884 actor.actor_id = new_actor_id;
885 (new_actor_id, actor)
886 })
887 .collect();
888 let new_fragment_info: HashMap<_, _> = info
889 .fragment_infos
890 .into_iter()
891 .map(|(fragment_id, mut fragment)| {
892 let actors = take(&mut fragment.actors);
893 fragment.actors = actors
894 .into_iter()
895 .map(|(old_actor_id, actor)| {
896 let new_actor_id = actor_mapping[&old_actor_id];
897 (new_actor_id, actor)
898 })
899 .collect();
900 (fragment_id, fragment)
901 })
902 .collect();
903 actor_splits.extend(
904 new_fragment_info
905 .values()
906 .flat_map(|fragment| &fragment.actors)
907 .map(|(actor_id, actor)| {
908 (
909 *actor_id,
910 ConnectorSplits {
911 splits: actor
912 .splits
913 .iter()
914 .map(ConnectorSplit::from)
915 .collect(),
916 },
917 )
918 }),
919 );
920 let partial_graph_id = to_partial_graph_id(self.database_id, None);
922 let mut edge_builder = FragmentEdgeBuilder::new(
923 info.upstream_fragment_downstreams
924 .keys()
925 .map(|upstream_fragment_id| {
926 self.database_info.fragment(*upstream_fragment_id)
927 })
928 .chain(new_fragment_info.values())
929 .map(|info| (info, partial_graph_id)),
930 partial_graph_manager.control_stream_manager(),
931 );
932 edge_builder.add_relations(&info.upstream_fragment_downstreams);
933 edge_builder.add_relations(&info.downstreams);
934 let mut edges = edge_builder.build();
935 let new_actors_to_create = edges.collect_actors_to_create(
936 new_fragment_info.values().map(|fragment| {
937 (
938 fragment.fragment_id,
939 &fragment.nodes,
940 fragment.actors.iter().map(|(actor_id, actor)| {
941 (&new_stream_actors[actor_id], actor.worker_id)
942 }),
943 [], )
945 }),
946 );
947 dispatcher_update.extend(
948 info.upstream_fragment_downstreams.keys().flat_map(
949 |upstream_fragment_id| {
950 let new_actor_dispatchers = edges
951 .dispatchers
952 .remove(upstream_fragment_id)
953 .expect("should exist");
954 new_actor_dispatchers.into_iter().flat_map(
955 |(upstream_actor_id, dispatchers)| {
956 dispatchers.into_iter().map(move |dispatcher| {
957 PbDispatcherUpdate {
958 actor_id: upstream_actor_id,
959 dispatcher_id: dispatcher.dispatcher_id,
960 hash_mapping: dispatcher.hash_mapping,
961 removed_downstream_actor_id: dispatcher
962 .downstream_actor_id
963 .iter()
964 .map(|new_downstream_actor_id| {
965 actor_mapping
966 .iter()
967 .find_map(
968 |(old_actor_id, new_actor_id)| {
969 (new_downstream_actor_id
970 == new_actor_id)
971 .then_some(*old_actor_id)
972 },
973 )
974 .expect("should exist")
975 })
976 .collect(),
977 added_downstream_actor_id: dispatcher
978 .downstream_actor_id,
979 }
980 })
981 },
982 )
983 },
984 ),
985 );
986 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
987 for (worker_id, worker_actors) in new_actors_to_create {
988 node_actors.entry(worker_id).or_default().extend(
989 worker_actors.values().flat_map(|(_, actors, _)| {
990 actors.iter().map(|(actor, _, _)| actor.actor_id)
991 }),
992 );
993 actors_to_create
994 .entry(worker_id)
995 .or_default()
996 .extend(worker_actors);
997 }
998 self.database_info.add_existing(InflightStreamingJobInfo {
999 job_id,
1000 fragment_infos: new_fragment_info,
1001 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
1003 cdc_table_backfill_tracker: None, });
1005 }
1006
1007 Some(PbMutation::Update(PbUpdateMutation {
1008 dispatcher_update,
1009 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
1013 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
1017 }))
1018 } else {
1019 let fragment_ids = self.database_info.take_pending_backfill_nodes();
1020 if fragment_ids.is_empty() {
1021 None
1022 } else {
1023 Some(PbMutation::StartFragmentBackfill(
1024 PbStartFragmentBackfillMutation { fragment_ids },
1025 ))
1026 }
1027 }
1028 }
1029 };
1030
1031 for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
1033 if !finished_snapshot_backfill_jobs.contains(job_id) {
1034 let throttle_mutation = if let Some((ref jobs, ref config)) =
1035 throttle_for_creating_jobs
1036 && jobs.contains(job_id)
1037 {
1038 assert_eq!(
1039 jobs.len(),
1040 1,
1041 "should not alter rate limit of snapshot backfill job with other jobs"
1042 );
1043 Some((
1044 Mutation::Throttle(ThrottleMutation {
1045 fragment_throttle: config
1046 .iter()
1047 .map(|(fragment_id, config)| (*fragment_id, *config))
1048 .collect(),
1049 }),
1050 take(notifiers),
1051 ))
1052 } else {
1053 None
1054 };
1055 creating_job.on_new_upstream_barrier(
1056 partial_graph_manager,
1057 barrier_info,
1058 throttle_mutation,
1059 )?;
1060 }
1061 }
1062
1063 partial_graph_manager.inject_barrier(
1064 to_partial_graph_id(self.database_id, None),
1065 mutation,
1066 barrier_info,
1067 &node_actors,
1068 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1069 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1070 actors_to_create,
1071 )?;
1072
1073 Ok(ApplyCommandInfo {
1074 mv_subscription_max_retention: self.database_info.max_subscription_retention(),
1075 table_ids_to_commit,
1076 jobs_to_wait: finished_snapshot_backfill_jobs,
1077 command: post_collect_command,
1078 })
1079 }
1080}