1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19
20use risingwave_common::bail;
21use risingwave_common::catalog::TableId;
22use risingwave_common::id::JobId;
23use risingwave_common::util::epoch::Epoch;
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
27use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
28use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
29use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
30use risingwave_pb::stream_plan::{
31 PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
32 PbUpstreamSinkInfo, ThrottleMutation,
33};
34use tracing::warn;
35
36use crate::MetaResult;
37use crate::barrier::cdc_progress::CdcTableBackfillTracker;
38use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
39use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
40use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
41use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
42use crate::barrier::info::{
43 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
44 SubscriberType,
45};
46use crate::barrier::notifier::Notifier;
47use crate::barrier::partial_graph::PartialGraphManager;
48use crate::barrier::rpc::to_partial_graph_id;
49use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
50use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
51use crate::model::{ActorId, ActorNewNoShuffle, FragmentId, StreamJobActorsToCreate};
52use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
53use crate::stream::{
54 GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
55 fill_snapshot_backfill_epoch,
56};
57
58pub(in crate::barrier) struct BarrierWorkerState {
60 in_flight_prev_epoch: TracedEpoch,
65
66 pending_non_checkpoint_barriers: Vec<u64>,
68
69 is_paused: bool,
71}
72
73impl BarrierWorkerState {
74 pub(super) fn new() -> Self {
75 Self {
76 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
77 pending_non_checkpoint_barriers: vec![],
78 is_paused: false,
79 }
80 }
81
82 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
83 Self {
84 in_flight_prev_epoch,
85 pending_non_checkpoint_barriers: vec![],
86 is_paused,
87 }
88 }
89
90 pub fn is_paused(&self) -> bool {
91 self.is_paused
92 }
93
94 fn set_is_paused(&mut self, is_paused: bool) {
95 if self.is_paused != is_paused {
96 tracing::info!(
97 currently_paused = self.is_paused,
98 newly_paused = is_paused,
99 "update paused state"
100 );
101 self.is_paused = is_paused;
102 }
103 }
104
105 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
106 &self.in_flight_prev_epoch
107 }
108
109 pub fn next_barrier_info(
111 &mut self,
112 is_checkpoint: bool,
113 curr_epoch: TracedEpoch,
114 ) -> BarrierInfo {
115 assert!(
116 self.in_flight_prev_epoch.value() < curr_epoch.value(),
117 "curr epoch regress. {} > {}",
118 self.in_flight_prev_epoch.value(),
119 curr_epoch.value()
120 );
121 let prev_epoch = self.in_flight_prev_epoch.clone();
122 self.in_flight_prev_epoch = curr_epoch.clone();
123 self.pending_non_checkpoint_barriers
124 .push(prev_epoch.value().0);
125 let kind = if is_checkpoint {
126 let epochs = take(&mut self.pending_non_checkpoint_barriers);
127 BarrierKind::Checkpoint(epochs)
128 } else {
129 BarrierKind::Barrier
130 };
131 BarrierInfo {
132 prev_epoch,
133 curr_epoch,
134 kind,
135 }
136 }
137}
138
139pub(super) struct ApplyCommandInfo {
140 pub mv_subscription_max_retention: HashMap<TableId, u64>,
141 pub table_ids_to_commit: HashSet<TableId>,
142 pub jobs_to_wait: HashSet<JobId>,
143 pub command: PostCollectCommand,
144}
145
146type ApplyCommandResult = (
149 Option<Mutation>,
150 HashSet<TableId>,
151 Option<StreamJobActorsToCreate>,
152 HashMap<WorkerId, HashSet<ActorId>>,
153 PostCollectCommand,
154);
155
156impl DatabaseCheckpointControl {
157 fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
159 let table_ids_to_commit = self.database_info.existing_table_ids().collect();
160 let node_actors =
161 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
162 (table_ids_to_commit, node_actors)
163 }
164
165 fn apply_simple_command(
169 &self,
170 mutation: Option<Mutation>,
171 command_name: &'static str,
172 ) -> ApplyCommandResult {
173 let (table_ids, node_actors) = self.collect_base_info();
174 (
175 mutation,
176 table_ids,
177 None,
178 node_actors,
179 PostCollectCommand::Command(command_name.to_owned()),
180 )
181 }
182
183 pub(super) fn apply_command(
186 &mut self,
187 command: Option<Command>,
188 notifiers: &mut Vec<Notifier>,
189 barrier_info: &BarrierInfo,
190 partial_graph_manager: &mut PartialGraphManager,
191 hummock_version_stats: &HummockVersionStats,
192 ) -> MetaResult<ApplyCommandInfo> {
193 debug_assert!(
194 !matches!(
195 command,
196 Some(Command::RescheduleIntent {
197 reschedule_plan: None,
198 ..
199 })
200 ),
201 "reschedule intent must be resolved before apply"
202 );
203 if matches!(
204 command,
205 Some(Command::RescheduleIntent {
206 reschedule_plan: None,
207 ..
208 })
209 ) {
210 bail!("reschedule intent must be resolved before apply");
211 }
212
213 fn resolve_source_splits(
218 info: &CreateStreamingJobCommandInfo,
219 actor_no_shuffle: &ActorNewNoShuffle,
220 database_info: &InflightDatabaseInfo,
221 ) -> MetaResult<SplitAssignment> {
222 let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
223 &info.stream_job_fragments,
224 &info.init_split_assignment,
225 )?;
226 resolved.extend(SourceManager::resolve_backfill_splits(
227 &info.stream_job_fragments,
228 actor_no_shuffle,
229 |fragment_id, actor_id| {
230 database_info
231 .fragment(fragment_id)
232 .actors
233 .get(&actor_id)
234 .map(|info| info.splits.clone())
235 },
236 )?);
237 Ok(resolved)
238 }
239
240 let mut throttle_for_creating_jobs: Option<(
242 HashSet<JobId>,
243 HashMap<FragmentId, ThrottleConfig>,
244 )> = None;
245
246 let (
250 mutation,
251 mut table_ids_to_commit,
252 mut actors_to_create,
253 mut node_actors,
254 post_collect_command,
255 ) = match command {
256 None => self.apply_simple_command(None, "barrier"),
257 Some(Command::CreateStreamingJob {
258 mut info,
259 job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
260 cross_db_snapshot_backfill_info,
261 }) => {
262 {
263 assert!(!self.state.is_paused());
264 let snapshot_epoch = barrier_info.prev_epoch();
265 for snapshot_backfill_epoch in snapshot_backfill_info
267 .upstream_mv_table_id_to_backfill_epoch
268 .values_mut()
269 {
270 assert_eq!(
271 snapshot_backfill_epoch.replace(snapshot_epoch),
272 None,
273 "must not set previously"
274 );
275 }
276 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
277 fill_snapshot_backfill_epoch(
278 &mut fragment.nodes,
279 Some(&snapshot_backfill_info),
280 &cross_db_snapshot_backfill_info,
281 )?;
282 }
283 let job_id = info.stream_job_fragments.stream_job_id();
284 let snapshot_backfill_upstream_tables = snapshot_backfill_info
285 .upstream_mv_table_id_to_backfill_epoch
286 .keys()
287 .cloned()
288 .collect();
289
290 let mut edges = self.database_info.build_edge(
292 Some((&info, true)),
293 None,
294 None,
295 partial_graph_manager.control_stream_manager(),
296 );
297 let actor_no_shuffle = edges.extract_no_shuffle();
298
299 let resolved_split_assignment =
301 resolve_source_splits(&info, &actor_no_shuffle, &self.database_info)?;
302
303 let Entry::Vacant(entry) = self.creating_streaming_job_controls.entry(job_id)
304 else {
305 panic!("duplicated creating snapshot backfill job {job_id}");
306 };
307
308 let job = CreatingStreamingJobControl::new(
309 entry,
310 CreateSnapshotBackfillJobCommandInfo {
311 info: info.clone(),
312 snapshot_backfill_info: snapshot_backfill_info.clone(),
313 cross_db_snapshot_backfill_info,
314 resolved_split_assignment: resolved_split_assignment.clone(),
315 },
316 take(notifiers),
317 snapshot_backfill_upstream_tables,
318 snapshot_epoch,
319 hummock_version_stats,
320 partial_graph_manager,
321 &mut edges,
322 &resolved_split_assignment,
323 )?;
324
325 self.database_info
326 .shared_actor_infos
327 .upsert(self.database_id, job.fragment_infos_with_job_id());
328
329 for upstream_mv_table_id in snapshot_backfill_info
330 .upstream_mv_table_id_to_backfill_epoch
331 .keys()
332 {
333 self.database_info.register_subscriber(
334 upstream_mv_table_id.as_job_id(),
335 info.streaming_job.id().as_subscriber_id(),
336 SubscriberType::SnapshotBackfill,
337 );
338 }
339
340 let mutation = Command::create_streaming_job_to_mutation(
341 &info,
342 &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
343 self.state.is_paused(),
344 &mut edges,
345 partial_graph_manager.control_stream_manager(),
346 None,
347 &resolved_split_assignment,
348 )?;
349
350 let (table_ids, node_actors) = self.collect_base_info();
351 (
352 Some(mutation),
353 table_ids,
354 None,
355 node_actors,
356 PostCollectCommand::barrier(),
357 )
358 }
359 }
360 Some(Command::CreateStreamingJob {
361 mut info,
362 job_type,
363 cross_db_snapshot_backfill_info,
364 }) => {
365 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
366 fill_snapshot_backfill_epoch(
367 &mut fragment.nodes,
368 None,
369 &cross_db_snapshot_backfill_info,
370 )?;
371 }
372
373 let new_upstream_sink =
375 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
376 Some(ctx)
377 } else {
378 None
379 };
380
381 let mut edges = self.database_info.build_edge(
382 Some((&info, false)),
383 None,
384 new_upstream_sink,
385 partial_graph_manager.control_stream_manager(),
386 );
387 let actor_no_shuffle = edges.extract_no_shuffle();
388
389 let resolved_split_assignment =
391 resolve_source_splits(&info, &actor_no_shuffle, &self.database_info)?;
392
393 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
395 let (fragment, _) =
396 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
397 .expect("should have parallel cdc fragment");
398 Some(CdcTableBackfillTracker::new(
399 fragment.fragment_id,
400 splits.clone(),
401 ))
402 } else {
403 None
404 };
405 self.database_info
406 .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
407 self.database_info.pre_apply_new_fragments(
408 info.stream_job_fragments
409 .new_fragment_info(&resolved_split_assignment)
410 .map(|(fragment_id, fragment_infos)| {
411 (fragment_id, info.streaming_job.id(), fragment_infos)
412 }),
413 );
414 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
415 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
416 self.database_info.pre_apply_add_node_upstream(
417 downstream_fragment_id,
418 &PbUpstreamSinkInfo {
419 upstream_fragment_id: ctx.sink_fragment_id,
420 sink_output_schema: ctx.sink_output_fields.clone(),
421 project_exprs: ctx.project_exprs.clone(),
422 },
423 );
424 }
425
426 let (table_ids, node_actors) = self.collect_base_info();
427
428 let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
430 &info, &mut edges,
431 ));
432
433 let actor_cdc_table_snapshot_splits = self
435 .database_info
436 .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
437
438 let is_currently_paused = self.state.is_paused();
440 let mutation = Command::create_streaming_job_to_mutation(
441 &info,
442 &job_type,
443 is_currently_paused,
444 &mut edges,
445 partial_graph_manager.control_stream_manager(),
446 actor_cdc_table_snapshot_splits,
447 &resolved_split_assignment,
448 )?;
449
450 (
451 Some(mutation),
452 table_ids,
453 actors_to_create,
454 node_actors,
455 PostCollectCommand::CreateStreamingJob {
456 info,
457 job_type,
458 cross_db_snapshot_backfill_info,
459 resolved_split_assignment,
460 },
461 )
462 }
463
464 Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
465
466 Some(Command::Pause) => {
467 let prev_is_paused = self.state.is_paused();
468 self.state.set_is_paused(true);
469 let mutation = Command::pause_to_mutation(prev_is_paused);
470 let (table_ids, node_actors) = self.collect_base_info();
471 (
472 mutation,
473 table_ids,
474 None,
475 node_actors,
476 PostCollectCommand::Command("Pause".to_owned()),
477 )
478 }
479
480 Some(Command::Resume) => {
481 let prev_is_paused = self.state.is_paused();
482 self.state.set_is_paused(false);
483 let mutation = Command::resume_to_mutation(prev_is_paused);
484 let (table_ids, node_actors) = self.collect_base_info();
485 (
486 mutation,
487 table_ids,
488 None,
489 node_actors,
490 PostCollectCommand::Command("Resume".to_owned()),
491 )
492 }
493
494 Some(Command::Throttle { jobs, config }) => {
495 let mutation = Some(Command::throttle_to_mutation(&config));
496 throttle_for_creating_jobs = Some((jobs, config));
497 self.apply_simple_command(mutation, "Throttle")
498 }
499
500 Some(Command::DropStreamingJobs {
501 streaming_job_ids,
502 actors,
503 unregistered_state_table_ids,
504 unregistered_fragment_ids,
505 dropped_sink_fragment_by_targets,
506 }) => {
507 for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
509 self.database_info
510 .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
511 }
512
513 let (table_ids, node_actors) = self.collect_base_info();
514
515 self.database_info
517 .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
518
519 let mutation = Some(Command::drop_streaming_jobs_to_mutation(
520 &actors,
521 &dropped_sink_fragment_by_targets,
522 ));
523 (
524 mutation,
525 table_ids,
526 None,
527 node_actors,
528 PostCollectCommand::DropStreamingJobs {
529 streaming_job_ids,
530 unregistered_state_table_ids,
531 },
532 )
533 }
534
535 Some(Command::RescheduleIntent {
536 reschedule_plan, ..
537 }) => {
538 let ReschedulePlan {
539 reschedules,
540 fragment_actors,
541 } = reschedule_plan
542 .as_ref()
543 .expect("reschedule intent should be resolved in global barrier worker");
544
545 for (fragment_id, reschedule) in reschedules {
547 self.database_info.pre_apply_reschedule(
548 *fragment_id,
549 reschedule
550 .added_actors
551 .iter()
552 .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
553 actors.iter().map(|actor_id| {
554 (
555 *actor_id,
556 InflightActorInfo {
557 worker_id: *node_id,
558 vnode_bitmap: reschedule
559 .newly_created_actors
560 .get(actor_id)
561 .expect("should exist")
562 .0
563 .0
564 .vnode_bitmap
565 .clone(),
566 splits: reschedule
567 .actor_splits
568 .get(actor_id)
569 .cloned()
570 .unwrap_or_default(),
571 },
572 )
573 })
574 })
575 .collect(),
576 reschedule
577 .vnode_bitmap_updates
578 .iter()
579 .filter(|(actor_id, _)| {
580 !reschedule.newly_created_actors.contains_key(*actor_id)
581 })
582 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
583 .collect(),
584 reschedule.actor_splits.clone(),
585 );
586 }
587
588 let (table_ids, node_actors) = self.collect_base_info();
589
590 let actors_to_create = Some(Command::reschedule_actors_to_create(
592 reschedules,
593 fragment_actors,
594 &self.database_info,
595 partial_graph_manager.control_stream_manager(),
596 ));
597
598 self.database_info
600 .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
601 (
602 *fragment_id,
603 reschedule.removed_actors.iter().cloned().collect(),
604 )
605 }));
606
607 let mutation = Command::reschedule_to_mutation(
609 reschedules,
610 fragment_actors,
611 partial_graph_manager.control_stream_manager(),
612 &mut self.database_info,
613 )?;
614
615 let reschedules = reschedule_plan
616 .expect("reschedule intent should be resolved in global barrier worker")
617 .reschedules;
618 (
619 mutation,
620 table_ids,
621 actors_to_create,
622 node_actors,
623 PostCollectCommand::Reschedule { reschedules },
624 )
625 }
626
627 Some(Command::ReplaceStreamJob(plan)) => {
628 let mut edges = self.database_info.build_edge(
630 None,
631 Some(&plan),
632 None,
633 partial_graph_manager.control_stream_manager(),
634 );
635 let actor_no_shuffle = edges.extract_no_shuffle();
636
637 let resolved_split_assignment = match &plan.split_plan {
639 ReplaceJobSplitPlan::Discovered(discovered) => {
640 SourceManager::resolve_fragment_to_actor_splits(
641 &plan.new_fragments,
642 discovered,
643 )?
644 }
645 ReplaceJobSplitPlan::AlignFromPrevious => {
646 SourceManager::resolve_replace_source_splits(
647 &plan.new_fragments,
648 &plan.replace_upstream,
649 &actor_no_shuffle,
650 |_fragment_id, actor_id| {
651 self.database_info.fragment_infos().find_map(|fragment| {
652 fragment
653 .actors
654 .get(&actor_id)
655 .map(|info| info.splits.clone())
656 })
657 },
658 )?
659 }
660 };
661
662 self.database_info.pre_apply_new_fragments(
664 plan.new_fragments
665 .new_fragment_info(&resolved_split_assignment)
666 .map(|(fragment_id, new_fragment)| {
667 (fragment_id, plan.streaming_job.id(), new_fragment)
668 }),
669 );
670 for (fragment_id, replace_map) in &plan.replace_upstream {
671 self.database_info
672 .pre_apply_replace_node_upstream(*fragment_id, replace_map);
673 }
674 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
675 self.database_info
676 .pre_apply_new_fragments(sinks.iter().map(|sink| {
677 (
678 sink.new_fragment.fragment_id,
679 sink.original_sink.id.as_job_id(),
680 sink.new_fragment_info(),
681 )
682 }));
683 }
684
685 let (table_ids, node_actors) = self.collect_base_info();
686
687 let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
689 &plan,
690 &mut edges,
691 &self.database_info,
692 ));
693
694 {
696 let mut fragment_ids_to_remove: Vec<_> = plan
697 .old_fragments
698 .fragments
699 .values()
700 .map(|f| f.fragment_id)
701 .collect();
702 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
703 fragment_ids_to_remove
704 .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
705 }
706 self.database_info
707 .post_apply_remove_fragments(fragment_ids_to_remove);
708 }
709
710 let mutation = Command::replace_stream_job_to_mutation(
712 &plan,
713 &mut edges,
714 &mut self.database_info,
715 &resolved_split_assignment,
716 )?;
717
718 (
719 mutation,
720 table_ids,
721 actors_to_create,
722 node_actors,
723 PostCollectCommand::ReplaceStreamJob {
724 plan,
725 resolved_split_assignment,
726 },
727 )
728 }
729
730 Some(Command::SourceChangeSplit(split_state)) => {
731 self.database_info.pre_apply_split_assignments(
733 split_state
734 .split_assignment
735 .iter()
736 .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
737 );
738
739 let mutation = Some(Command::source_change_split_to_mutation(
740 &split_state.split_assignment,
741 ));
742 let (table_ids, node_actors) = self.collect_base_info();
743 (
744 mutation,
745 table_ids,
746 None,
747 node_actors,
748 PostCollectCommand::SourceChangeSplit {
749 split_assignment: split_state.split_assignment,
750 },
751 )
752 }
753
754 Some(Command::CreateSubscription {
755 subscription_id,
756 upstream_mv_table_id,
757 retention_second,
758 }) => {
759 self.database_info.register_subscriber(
760 upstream_mv_table_id.as_job_id(),
761 subscription_id.as_subscriber_id(),
762 SubscriberType::Subscription(retention_second),
763 );
764 let mutation = Some(Command::create_subscription_to_mutation(
765 upstream_mv_table_id,
766 subscription_id,
767 ));
768 let (table_ids, node_actors) = self.collect_base_info();
769 (
770 mutation,
771 table_ids,
772 None,
773 node_actors,
774 PostCollectCommand::CreateSubscription { subscription_id },
775 )
776 }
777
778 Some(Command::DropSubscription {
779 subscription_id,
780 upstream_mv_table_id,
781 }) => {
782 if self
783 .database_info
784 .unregister_subscriber(
785 upstream_mv_table_id.as_job_id(),
786 subscription_id.as_subscriber_id(),
787 )
788 .is_none()
789 {
790 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
791 }
792 let mutation = Some(Command::drop_subscription_to_mutation(
793 upstream_mv_table_id,
794 subscription_id,
795 ));
796 let (table_ids, node_actors) = self.collect_base_info();
797 (
798 mutation,
799 table_ids,
800 None,
801 node_actors,
802 PostCollectCommand::Command("DropSubscription".to_owned()),
803 )
804 }
805
806 Some(Command::AlterSubscriptionRetention {
807 subscription_id,
808 upstream_mv_table_id,
809 retention_second,
810 }) => {
811 self.database_info.update_subscription_retention(
812 upstream_mv_table_id.as_job_id(),
813 subscription_id.as_subscriber_id(),
814 retention_second,
815 );
816 self.apply_simple_command(None, "AlterSubscriptionRetention")
817 }
818
819 Some(Command::ConnectorPropsChange(config)) => {
820 let mutation = Some(Command::connector_props_change_to_mutation(&config));
821 let (table_ids, node_actors) = self.collect_base_info();
822 (
823 mutation,
824 table_ids,
825 None,
826 node_actors,
827 PostCollectCommand::ConnectorPropsChange(config),
828 )
829 }
830
831 Some(Command::Refresh {
832 table_id,
833 associated_source_id,
834 }) => {
835 let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
836 self.apply_simple_command(mutation, "Refresh")
837 }
838
839 Some(Command::ListFinish {
840 table_id: _,
841 associated_source_id,
842 }) => {
843 let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
844 self.apply_simple_command(mutation, "ListFinish")
845 }
846
847 Some(Command::LoadFinish {
848 table_id: _,
849 associated_source_id,
850 }) => {
851 let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
852 self.apply_simple_command(mutation, "LoadFinish")
853 }
854
855 Some(Command::ResetSource { source_id }) => {
856 let mutation = Some(Command::reset_source_to_mutation(source_id));
857 self.apply_simple_command(mutation, "ResetSource")
858 }
859
860 Some(Command::ResumeBackfill { target }) => {
861 let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
862 let (table_ids, node_actors) = self.collect_base_info();
863 (
864 mutation,
865 table_ids,
866 None,
867 node_actors,
868 PostCollectCommand::ResumeBackfill { target },
869 )
870 }
871
872 Some(Command::InjectSourceOffsets {
873 source_id,
874 split_offsets,
875 }) => {
876 let mutation = Some(Command::inject_source_offsets_to_mutation(
877 source_id,
878 &split_offsets,
879 ));
880 self.apply_simple_command(mutation, "InjectSourceOffsets")
881 }
882 };
883
884 let mut finished_snapshot_backfill_jobs = HashSet::new();
885 let mutation = match mutation {
886 Some(mutation) => Some(mutation),
887 None => {
888 let mut finished_snapshot_backfill_job_info = HashMap::new();
889 if barrier_info.kind.is_checkpoint() {
890 for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
891 if creating_job.should_merge_to_upstream() {
892 let info = creating_job
893 .start_consume_upstream(partial_graph_manager, barrier_info)?;
894 finished_snapshot_backfill_job_info
895 .try_insert(job_id, info)
896 .expect("non-duplicated");
897 }
898 }
899 }
900
901 if !finished_snapshot_backfill_job_info.is_empty() {
902 let actors_to_create = actors_to_create.get_or_insert_default();
903 let mut subscriptions_to_drop = vec![];
904 let mut dispatcher_update = vec![];
905 let mut actor_splits = HashMap::new();
906 for (job_id, info) in finished_snapshot_backfill_job_info {
907 finished_snapshot_backfill_jobs.insert(job_id);
908 subscriptions_to_drop.extend(
909 info.snapshot_backfill_upstream_tables.iter().map(
910 |upstream_table_id| PbSubscriptionUpstreamInfo {
911 subscriber_id: job_id.as_subscriber_id(),
912 upstream_mv_table_id: *upstream_table_id,
913 },
914 ),
915 );
916 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
917 assert_matches!(
918 self.database_info.unregister_subscriber(
919 upstream_mv_table_id.as_job_id(),
920 job_id.as_subscriber_id()
921 ),
922 Some(SubscriberType::SnapshotBackfill)
923 );
924 }
925
926 table_ids_to_commit.extend(
927 info.fragment_infos
928 .values()
929 .flat_map(|fragment| fragment.state_table_ids.iter())
930 .copied(),
931 );
932
933 let actor_len = info
934 .fragment_infos
935 .values()
936 .map(|fragment| fragment.actors.len() as u64)
937 .sum();
938 let id_gen = GlobalActorIdGen::new(
939 partial_graph_manager
940 .control_stream_manager()
941 .env
942 .actor_id_generator(),
943 actor_len,
944 );
945 let mut next_local_actor_id = 0;
946 let actor_mapping: HashMap<_, _> = info
948 .fragment_infos
949 .values()
950 .flat_map(|fragment| fragment.actors.keys())
951 .map(|old_actor_id| {
952 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
953 next_local_actor_id += 1;
954 (*old_actor_id, new_actor_id.as_global_id())
955 })
956 .collect();
957 let actor_mapping = &actor_mapping;
958 let new_stream_actors: HashMap<_, _> = info
959 .stream_actors
960 .into_iter()
961 .map(|(old_actor_id, mut actor)| {
962 let new_actor_id = actor_mapping[&old_actor_id];
963 actor.actor_id = new_actor_id;
964 (new_actor_id, actor)
965 })
966 .collect();
967 let new_fragment_info: HashMap<_, _> = info
968 .fragment_infos
969 .into_iter()
970 .map(|(fragment_id, mut fragment)| {
971 let actors = take(&mut fragment.actors);
972 fragment.actors = actors
973 .into_iter()
974 .map(|(old_actor_id, actor)| {
975 let new_actor_id = actor_mapping[&old_actor_id];
976 (new_actor_id, actor)
977 })
978 .collect();
979 (fragment_id, fragment)
980 })
981 .collect();
982 actor_splits.extend(
983 new_fragment_info
984 .values()
985 .flat_map(|fragment| &fragment.actors)
986 .map(|(actor_id, actor)| {
987 (
988 *actor_id,
989 ConnectorSplits {
990 splits: actor
991 .splits
992 .iter()
993 .map(ConnectorSplit::from)
994 .collect(),
995 },
996 )
997 }),
998 );
999 let partial_graph_id = to_partial_graph_id(self.database_id, None);
1001 let mut edge_builder = FragmentEdgeBuilder::new(
1002 info.upstream_fragment_downstreams
1003 .keys()
1004 .map(|upstream_fragment_id| {
1005 self.database_info.fragment(*upstream_fragment_id)
1006 })
1007 .chain(new_fragment_info.values())
1008 .map(|fragment| {
1009 (
1010 fragment.fragment_id,
1011 EdgeBuilderFragmentInfo::from_inflight(
1012 fragment,
1013 partial_graph_id,
1014 partial_graph_manager.control_stream_manager(),
1015 ),
1016 )
1017 }),
1018 );
1019 edge_builder.add_relations(&info.upstream_fragment_downstreams);
1020 edge_builder.add_relations(&info.downstreams);
1021 let mut edges = edge_builder.build();
1022 let new_actors_to_create = edges.collect_actors_to_create(
1023 new_fragment_info.values().map(|fragment| {
1024 (
1025 fragment.fragment_id,
1026 &fragment.nodes,
1027 fragment.actors.iter().map(|(actor_id, actor)| {
1028 (&new_stream_actors[actor_id], actor.worker_id)
1029 }),
1030 [], )
1032 }),
1033 );
1034 dispatcher_update.extend(
1035 info.upstream_fragment_downstreams.keys().flat_map(
1036 |upstream_fragment_id| {
1037 let new_actor_dispatchers = edges
1038 .dispatchers
1039 .remove(upstream_fragment_id)
1040 .expect("should exist");
1041 new_actor_dispatchers.into_iter().flat_map(
1042 |(upstream_actor_id, dispatchers)| {
1043 dispatchers.into_iter().map(move |dispatcher| {
1044 PbDispatcherUpdate {
1045 actor_id: upstream_actor_id,
1046 dispatcher_id: dispatcher.dispatcher_id,
1047 hash_mapping: dispatcher.hash_mapping,
1048 removed_downstream_actor_id: dispatcher
1049 .downstream_actor_id
1050 .iter()
1051 .map(|new_downstream_actor_id| {
1052 actor_mapping
1053 .iter()
1054 .find_map(
1055 |(old_actor_id, new_actor_id)| {
1056 (new_downstream_actor_id
1057 == new_actor_id)
1058 .then_some(*old_actor_id)
1059 },
1060 )
1061 .expect("should exist")
1062 })
1063 .collect(),
1064 added_downstream_actor_id: dispatcher
1065 .downstream_actor_id,
1066 }
1067 })
1068 },
1069 )
1070 },
1071 ),
1072 );
1073 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1074 for (worker_id, worker_actors) in new_actors_to_create {
1075 node_actors.entry(worker_id).or_default().extend(
1076 worker_actors.values().flat_map(|(_, actors, _)| {
1077 actors.iter().map(|(actor, _, _)| actor.actor_id)
1078 }),
1079 );
1080 actors_to_create
1081 .entry(worker_id)
1082 .or_default()
1083 .extend(worker_actors);
1084 }
1085 self.database_info.add_existing(InflightStreamingJobInfo {
1086 job_id,
1087 fragment_infos: new_fragment_info,
1088 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
1090 cdc_table_backfill_tracker: None, });
1092 }
1093
1094 Some(PbMutation::Update(PbUpdateMutation {
1095 dispatcher_update,
1096 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
1100 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
1104 }))
1105 } else {
1106 let fragment_ids = self.database_info.take_pending_backfill_nodes();
1107 if fragment_ids.is_empty() {
1108 None
1109 } else {
1110 Some(PbMutation::StartFragmentBackfill(
1111 PbStartFragmentBackfillMutation { fragment_ids },
1112 ))
1113 }
1114 }
1115 }
1116 };
1117
1118 for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
1120 if !finished_snapshot_backfill_jobs.contains(job_id) {
1121 let throttle_mutation = if let Some((ref jobs, ref config)) =
1122 throttle_for_creating_jobs
1123 && jobs.contains(job_id)
1124 {
1125 assert_eq!(
1126 jobs.len(),
1127 1,
1128 "should not alter rate limit of snapshot backfill job with other jobs"
1129 );
1130 Some((
1131 Mutation::Throttle(ThrottleMutation {
1132 fragment_throttle: config
1133 .iter()
1134 .map(|(fragment_id, config)| (*fragment_id, *config))
1135 .collect(),
1136 }),
1137 take(notifiers),
1138 ))
1139 } else {
1140 None
1141 };
1142 creating_job.on_new_upstream_barrier(
1143 partial_graph_manager,
1144 barrier_info,
1145 throttle_mutation,
1146 )?;
1147 }
1148 }
1149
1150 partial_graph_manager.inject_barrier(
1151 to_partial_graph_id(self.database_id, None),
1152 mutation,
1153 barrier_info,
1154 &node_actors,
1155 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1156 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1157 actors_to_create,
1158 )?;
1159
1160 Ok(ApplyCommandInfo {
1161 mv_subscription_max_retention: self.database_info.max_subscription_retention(),
1162 table_ids_to_commit,
1163 jobs_to_wait: finished_snapshot_backfill_jobs,
1164 command: post_collect_command,
1165 })
1166 }
1167}