1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::sync::atomic::AtomicU32;
20
21use risingwave_common::bail;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::TableId;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
33use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
34use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
35use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
36use risingwave_pb::stream_plan::{
37 PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
38 PbUpstreamSinkInfo, ThrottleMutation,
39};
40use tracing::warn;
41
42use crate::MetaResult;
43use crate::barrier::cdc_progress::CdcTableBackfillTracker;
44use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
45use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
46use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
47use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
48use crate::barrier::info::{
49 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
50 SubscriberType,
51};
52use crate::barrier::notifier::Notifier;
53use crate::barrier::partial_graph::PartialGraphManager;
54use crate::barrier::rpc::to_partial_graph_id;
55use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
56use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
57use crate::controller::scale::{
58 ComponentFragmentAligner, EnsembleActorTemplate, NoShuffleEnsemble,
59 build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
60};
61use crate::model::{
62 ActorId, ActorNewNoShuffle, FragmentDownstreamRelation, FragmentId, StreamActor, StreamContext,
63 StreamJobActorsToCreate, StreamJobFragmentsToCreate,
64};
65use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
66use crate::stream::{
67 GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
68 fill_snapshot_backfill_epoch,
69};
70
71pub(in crate::barrier) struct BarrierWorkerState {
73 in_flight_prev_epoch: TracedEpoch,
78
79 pending_non_checkpoint_barriers: Vec<u64>,
81
82 is_paused: bool,
84}
85
86impl BarrierWorkerState {
87 pub(super) fn new() -> Self {
88 Self {
89 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
90 pending_non_checkpoint_barriers: vec![],
91 is_paused: false,
92 }
93 }
94
95 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
96 Self {
97 in_flight_prev_epoch,
98 pending_non_checkpoint_barriers: vec![],
99 is_paused,
100 }
101 }
102
103 pub fn is_paused(&self) -> bool {
104 self.is_paused
105 }
106
107 fn set_is_paused(&mut self, is_paused: bool) {
108 if self.is_paused != is_paused {
109 tracing::info!(
110 currently_paused = self.is_paused,
111 newly_paused = is_paused,
112 "update paused state"
113 );
114 self.is_paused = is_paused;
115 }
116 }
117
118 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
119 &self.in_flight_prev_epoch
120 }
121
122 pub fn next_barrier_info(
124 &mut self,
125 is_checkpoint: bool,
126 curr_epoch: TracedEpoch,
127 ) -> BarrierInfo {
128 assert!(
129 self.in_flight_prev_epoch.value() < curr_epoch.value(),
130 "curr epoch regress. {} > {}",
131 self.in_flight_prev_epoch.value(),
132 curr_epoch.value()
133 );
134 let prev_epoch = self.in_flight_prev_epoch.clone();
135 self.in_flight_prev_epoch = curr_epoch.clone();
136 self.pending_non_checkpoint_barriers
137 .push(prev_epoch.value().0);
138 let kind = if is_checkpoint {
139 let epochs = take(&mut self.pending_non_checkpoint_barriers);
140 BarrierKind::Checkpoint(epochs)
141 } else {
142 BarrierKind::Barrier
143 };
144 BarrierInfo {
145 prev_epoch,
146 curr_epoch,
147 kind,
148 }
149 }
150}
151
152pub(super) struct ApplyCommandInfo {
153 pub mv_subscription_max_retention: HashMap<TableId, u64>,
154 pub table_ids_to_commit: HashSet<TableId>,
155 pub jobs_to_wait: HashSet<JobId>,
156 pub command: PostCollectCommand,
157}
158
159type ApplyCommandResult = (
162 Option<Mutation>,
163 HashSet<TableId>,
164 Option<StreamJobActorsToCreate>,
165 HashMap<WorkerId, HashSet<ActorId>>,
166 PostCollectCommand,
167);
168
169pub(super) struct RenderResult {
171 pub stream_actors: HashMap<FragmentId, Vec<StreamActor>>,
173 pub actor_location: HashMap<ActorId, WorkerId>,
175}
176
177fn resolve_no_shuffle_ensembles(
186 fragments: &StreamJobFragmentsToCreate,
187 upstream_fragment_downstreams: &FragmentDownstreamRelation,
188) -> MetaResult<Vec<NoShuffleEnsemble>> {
189 let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
191
192 for (upstream_fid, relations) in &fragments.downstreams {
194 for rel in relations {
195 if rel.dispatcher_type == DispatcherType::NoShuffle {
196 new_no_shuffle
197 .entry(*upstream_fid)
198 .or_default()
199 .insert(rel.downstream_fragment_id);
200 }
201 }
202 }
203
204 for (upstream_fid, relations) in upstream_fragment_downstreams {
206 for rel in relations {
207 if rel.dispatcher_type == DispatcherType::NoShuffle {
208 new_no_shuffle
209 .entry(*upstream_fid)
210 .or_default()
211 .insert(rel.downstream_fragment_id);
212 }
213 }
214 }
215
216 let mut ensembles = if new_no_shuffle.is_empty() {
217 Vec::new()
218 } else {
219 let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
221 .iter()
222 .flat_map(|(upstream_fid, downstream_fids)| {
223 downstream_fids
224 .iter()
225 .map(move |downstream_fid| (*upstream_fid, *downstream_fid))
226 })
227 .collect();
228
229 let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
230 .iter()
231 .flat_map(|(u, d)| [*u, *d])
232 .collect::<HashSet<_>>()
233 .into_iter()
234 .collect();
235
236 let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
237 find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
238 };
239
240 let covered: HashSet<FragmentId> = ensembles
242 .iter()
243 .flat_map(|e| e.component_fragments())
244 .collect();
245 for fragment_id in fragments.inner.fragments.keys() {
246 if !covered.contains(fragment_id) {
247 ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
248 }
249 }
250
251 Ok(ensembles)
252}
253
254fn render_actors(
265 fragments: &StreamJobFragmentsToCreate,
266 database_info: &InflightDatabaseInfo,
267 definition: &str,
268 ctx: &StreamContext,
269 streaming_job_model: &streaming_job::Model,
270 actor_id_counter: &AtomicU32,
271 worker_map: &HashMap<WorkerId, WorkerNode>,
272 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
273 ensembles: &[NoShuffleEnsemble],
274 database_resource_group: &str,
275) -> MetaResult<RenderResult> {
276 let mut actor_assignments: HashMap<FragmentId, HashMap<ActorId, (WorkerId, Option<Bitmap>)>> =
279 HashMap::new();
280
281 for ensemble in ensembles {
282 let existing_fragment_ids: Vec<FragmentId> = ensemble
287 .component_fragments()
288 .filter(|fragment_id| !fragments.inner.fragments.contains_key(fragment_id))
289 .collect();
290
291 let actor_template = if let Some(&first_existing) = existing_fragment_ids.first() {
292 let template = EnsembleActorTemplate::from_existing_inflight_fragment(
293 database_info.fragment(first_existing),
294 );
295
296 for &other_fragment_id in &existing_fragment_ids[1..] {
299 let other = EnsembleActorTemplate::from_existing_inflight_fragment(
300 database_info.fragment(other_fragment_id),
301 );
302 template.assert_aligned_with(&other, first_existing, other_fragment_id);
303 }
304
305 template
306 } else {
307 let first_component = ensemble
309 .component_fragments()
310 .next()
311 .expect("ensemble must have at least one component");
312 let fragment = &fragments.inner.fragments[&first_component];
313 let distribution_type: DistributionType = fragment.distribution_type.into();
314 let vnode_count = fragment.vnode_count();
315
316 for fragment_id in ensemble.component_fragments() {
318 let f = &fragments.inner.fragments[&fragment_id];
319 assert_eq!(
320 vnode_count,
321 f.vnode_count(),
322 "component fragments {} and {} in the same no-shuffle ensemble have \
323 different vnode counts: {} vs {}",
324 first_component,
325 fragment_id,
326 vnode_count,
327 f.vnode_count(),
328 );
329 }
330
331 EnsembleActorTemplate::render_new(
332 streaming_job_model,
333 worker_map,
334 adaptive_parallelism_strategy,
335 None,
336 database_resource_group.to_owned(),
337 distribution_type,
338 vnode_count,
339 )?
340 };
341
342 for fragment_id in ensemble.component_fragments() {
344 if !fragments.inner.fragments.contains_key(&fragment_id) {
345 continue; }
347 let fragment = &fragments.inner.fragments[&fragment_id];
348 let distribution_type: DistributionType = fragment.distribution_type.into();
349 let aligner =
350 ComponentFragmentAligner::new_persistent(&actor_template, actor_id_counter);
351 let assignments = aligner.align_component_actor(distribution_type);
352 actor_assignments.insert(fragment_id, assignments);
353 }
354 }
355
356 let mut result_stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
358 let mut result_actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
359
360 for (fragment_id, assignments) in &actor_assignments {
361 let mut actors = Vec::with_capacity(assignments.len());
362 for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
363 result_actor_location.insert(actor_id, *worker_id);
364 actors.push(StreamActor {
365 actor_id,
366 fragment_id: *fragment_id,
367 vnode_bitmap: vnode_bitmap.clone(),
368 mview_definition: definition.to_owned(),
369 expr_context: Some(ctx.to_expr_context()),
370 config_override: ctx.config_override.clone(),
371 });
372 }
373 result_stream_actors.insert(*fragment_id, actors);
374 }
375
376 Ok(RenderResult {
377 stream_actors: result_stream_actors,
378 actor_location: result_actor_location,
379 })
380}
381impl DatabaseCheckpointControl {
382 fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
384 let table_ids_to_commit = self.database_info.existing_table_ids().collect();
385 let node_actors =
386 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
387 (table_ids_to_commit, node_actors)
388 }
389
390 fn apply_simple_command(
394 &self,
395 mutation: Option<Mutation>,
396 command_name: &'static str,
397 ) -> ApplyCommandResult {
398 let (table_ids, node_actors) = self.collect_base_info();
399 (
400 mutation,
401 table_ids,
402 None,
403 node_actors,
404 PostCollectCommand::Command(command_name.to_owned()),
405 )
406 }
407
408 pub(super) fn apply_command(
411 &mut self,
412 command: Option<Command>,
413 notifiers: &mut Vec<Notifier>,
414 barrier_info: &BarrierInfo,
415 partial_graph_manager: &mut PartialGraphManager,
416 hummock_version_stats: &HummockVersionStats,
417 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
418 worker_nodes: &HashMap<WorkerId, WorkerNode>,
419 ) -> MetaResult<ApplyCommandInfo> {
420 debug_assert!(
421 !matches!(
422 command,
423 Some(Command::RescheduleIntent {
424 reschedule_plan: None,
425 ..
426 })
427 ),
428 "reschedule intent must be resolved before apply"
429 );
430 if matches!(
431 command,
432 Some(Command::RescheduleIntent {
433 reschedule_plan: None,
434 ..
435 })
436 ) {
437 bail!("reschedule intent must be resolved before apply");
438 }
439
440 fn resolve_source_splits(
445 info: &CreateStreamingJobCommandInfo,
446 render_result: &RenderResult,
447 actor_no_shuffle: &ActorNewNoShuffle,
448 database_info: &InflightDatabaseInfo,
449 ) -> MetaResult<SplitAssignment> {
450 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
451 .stream_actors
452 .iter()
453 .map(|(fragment_id, actors)| {
454 (
455 *fragment_id,
456 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
457 )
458 })
459 .collect();
460 let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
461 &info.stream_job_fragments,
462 &info.init_split_assignment,
463 &fragment_actor_ids,
464 )?;
465 resolved.extend(SourceManager::resolve_backfill_splits(
466 &info.stream_job_fragments,
467 actor_no_shuffle,
468 |fragment_id, actor_id| {
469 database_info
470 .fragment(fragment_id)
471 .actors
472 .get(&actor_id)
473 .map(|info| info.splits.clone())
474 },
475 )?);
476 Ok(resolved)
477 }
478
479 let mut throttle_for_creating_jobs: Option<(
481 HashSet<JobId>,
482 HashMap<FragmentId, ThrottleConfig>,
483 )> = None;
484
485 let (
489 mutation,
490 mut table_ids_to_commit,
491 mut actors_to_create,
492 mut node_actors,
493 post_collect_command,
494 ) = match command {
495 None => self.apply_simple_command(None, "barrier"),
496 Some(Command::CreateStreamingJob {
497 mut info,
498 job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
499 cross_db_snapshot_backfill_info,
500 }) => {
501 let ensembles = resolve_no_shuffle_ensembles(
502 &info.stream_job_fragments,
503 &info.upstream_fragment_downstreams,
504 )?;
505 let actors = render_actors(
506 &info.stream_job_fragments,
507 &self.database_info,
508 &info.definition,
509 &info.stream_job_fragments.inner.ctx,
510 &info.streaming_job_model,
511 partial_graph_manager
512 .control_stream_manager()
513 .env
514 .actor_id_generator(),
515 worker_nodes,
516 adaptive_parallelism_strategy,
517 &ensembles,
518 &info.database_resource_group,
519 )?;
520 {
521 assert!(!self.state.is_paused());
522 let snapshot_epoch = barrier_info.prev_epoch();
523 for snapshot_backfill_epoch in snapshot_backfill_info
525 .upstream_mv_table_id_to_backfill_epoch
526 .values_mut()
527 {
528 assert_eq!(
529 snapshot_backfill_epoch.replace(snapshot_epoch),
530 None,
531 "must not set previously"
532 );
533 }
534 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
535 fill_snapshot_backfill_epoch(
536 &mut fragment.nodes,
537 Some(&snapshot_backfill_info),
538 &cross_db_snapshot_backfill_info,
539 )?;
540 }
541 let job_id = info.stream_job_fragments.stream_job_id();
542 let snapshot_backfill_upstream_tables = snapshot_backfill_info
543 .upstream_mv_table_id_to_backfill_epoch
544 .keys()
545 .cloned()
546 .collect();
547
548 let mut edges = self.database_info.build_edge(
550 Some((&info, true)),
551 None,
552 None,
553 partial_graph_manager.control_stream_manager(),
554 &actors.stream_actors,
555 &actors.actor_location,
556 );
557 let resolved_split_assignment = resolve_source_splits(
559 &info,
560 &actors,
561 edges.actor_new_no_shuffle(),
562 &self.database_info,
563 )?;
564
565 let Entry::Vacant(entry) = self.creating_streaming_job_controls.entry(job_id)
566 else {
567 panic!("duplicated creating snapshot backfill job {job_id}");
568 };
569
570 let job = CreatingStreamingJobControl::new(
571 entry,
572 CreateSnapshotBackfillJobCommandInfo {
573 info: info.clone(),
574 snapshot_backfill_info: snapshot_backfill_info.clone(),
575 cross_db_snapshot_backfill_info,
576 resolved_split_assignment: resolved_split_assignment.clone(),
577 },
578 take(notifiers),
579 snapshot_backfill_upstream_tables,
580 snapshot_epoch,
581 hummock_version_stats,
582 partial_graph_manager,
583 &mut edges,
584 &resolved_split_assignment,
585 &actors,
586 )?;
587
588 self.database_info
589 .shared_actor_infos
590 .upsert(self.database_id, job.fragment_infos_with_job_id());
591
592 for upstream_mv_table_id in snapshot_backfill_info
593 .upstream_mv_table_id_to_backfill_epoch
594 .keys()
595 {
596 self.database_info.register_subscriber(
597 upstream_mv_table_id.as_job_id(),
598 info.streaming_job.id().as_subscriber_id(),
599 SubscriberType::SnapshotBackfill,
600 );
601 }
602
603 let mutation = Command::create_streaming_job_to_mutation(
604 &info,
605 &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
606 self.state.is_paused(),
607 &mut edges,
608 partial_graph_manager.control_stream_manager(),
609 None,
610 &resolved_split_assignment,
611 &actors.stream_actors,
612 &actors.actor_location,
613 )?;
614
615 let (table_ids, node_actors) = self.collect_base_info();
616 (
617 Some(mutation),
618 table_ids,
619 None,
620 node_actors,
621 PostCollectCommand::barrier(),
622 )
623 }
624 }
625 Some(Command::CreateStreamingJob {
626 mut info,
627 job_type,
628 cross_db_snapshot_backfill_info,
629 }) => {
630 let ensembles = resolve_no_shuffle_ensembles(
631 &info.stream_job_fragments,
632 &info.upstream_fragment_downstreams,
633 )?;
634 let actors = render_actors(
635 &info.stream_job_fragments,
636 &self.database_info,
637 &info.definition,
638 &info.stream_job_fragments.inner.ctx,
639 &info.streaming_job_model,
640 partial_graph_manager
641 .control_stream_manager()
642 .env
643 .actor_id_generator(),
644 worker_nodes,
645 adaptive_parallelism_strategy,
646 &ensembles,
647 &info.database_resource_group,
648 )?;
649 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
650 fill_snapshot_backfill_epoch(
651 &mut fragment.nodes,
652 None,
653 &cross_db_snapshot_backfill_info,
654 )?;
655 }
656
657 let new_upstream_sink =
659 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
660 Some(ctx)
661 } else {
662 None
663 };
664
665 let mut edges = self.database_info.build_edge(
666 Some((&info, false)),
667 None,
668 new_upstream_sink,
669 partial_graph_manager.control_stream_manager(),
670 &actors.stream_actors,
671 &actors.actor_location,
672 );
673 let resolved_split_assignment = resolve_source_splits(
675 &info,
676 &actors,
677 edges.actor_new_no_shuffle(),
678 &self.database_info,
679 )?;
680
681 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
683 let (fragment, _) =
684 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
685 .expect("should have parallel cdc fragment");
686 Some(CdcTableBackfillTracker::new(
687 fragment.fragment_id,
688 splits.clone(),
689 ))
690 } else {
691 None
692 };
693 self.database_info
694 .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
695 self.database_info.pre_apply_new_fragments(
696 info.stream_job_fragments
697 .new_fragment_info(
698 &actors.stream_actors,
699 &actors.actor_location,
700 &resolved_split_assignment,
701 )
702 .map(|(fragment_id, fragment_infos)| {
703 (fragment_id, info.streaming_job.id(), fragment_infos)
704 }),
705 );
706 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
707 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
708 self.database_info.pre_apply_add_node_upstream(
709 downstream_fragment_id,
710 &PbUpstreamSinkInfo {
711 upstream_fragment_id: ctx.sink_fragment_id,
712 sink_output_schema: ctx.sink_output_fields.clone(),
713 project_exprs: ctx.project_exprs.clone(),
714 },
715 );
716 }
717
718 let (table_ids, node_actors) = self.collect_base_info();
719
720 let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
722 &info,
723 &mut edges,
724 &actors.stream_actors,
725 &actors.actor_location,
726 ));
727
728 let actor_cdc_table_snapshot_splits = self
730 .database_info
731 .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
732
733 let is_currently_paused = self.state.is_paused();
735 let mutation = Command::create_streaming_job_to_mutation(
736 &info,
737 &job_type,
738 is_currently_paused,
739 &mut edges,
740 partial_graph_manager.control_stream_manager(),
741 actor_cdc_table_snapshot_splits,
742 &resolved_split_assignment,
743 &actors.stream_actors,
744 &actors.actor_location,
745 )?;
746
747 (
748 Some(mutation),
749 table_ids,
750 actors_to_create,
751 node_actors,
752 PostCollectCommand::CreateStreamingJob {
753 info,
754 job_type,
755 cross_db_snapshot_backfill_info,
756 resolved_split_assignment,
757 },
758 )
759 }
760
761 Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
762
763 Some(Command::Pause) => {
764 let prev_is_paused = self.state.is_paused();
765 self.state.set_is_paused(true);
766 let mutation = Command::pause_to_mutation(prev_is_paused);
767 let (table_ids, node_actors) = self.collect_base_info();
768 (
769 mutation,
770 table_ids,
771 None,
772 node_actors,
773 PostCollectCommand::Command("Pause".to_owned()),
774 )
775 }
776
777 Some(Command::Resume) => {
778 let prev_is_paused = self.state.is_paused();
779 self.state.set_is_paused(false);
780 let mutation = Command::resume_to_mutation(prev_is_paused);
781 let (table_ids, node_actors) = self.collect_base_info();
782 (
783 mutation,
784 table_ids,
785 None,
786 node_actors,
787 PostCollectCommand::Command("Resume".to_owned()),
788 )
789 }
790
791 Some(Command::Throttle { jobs, config }) => {
792 let mutation = Some(Command::throttle_to_mutation(&config));
793 throttle_for_creating_jobs = Some((jobs, config));
794 self.apply_simple_command(mutation, "Throttle")
795 }
796
797 Some(Command::DropStreamingJobs {
798 streaming_job_ids,
799 unregistered_state_table_ids,
800 unregistered_fragment_ids,
801 dropped_sink_fragment_by_targets,
802 }) => {
803 let actors = self
804 .database_info
805 .fragment_infos()
806 .filter(|fragment| {
807 self.database_info
808 .job_id_by_fragment(fragment.fragment_id)
809 .is_some_and(|job_id| streaming_job_ids.contains(&job_id))
810 })
811 .flat_map(|fragment| fragment.actors.keys().copied())
812 .collect::<Vec<_>>();
813
814 for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
816 self.database_info
817 .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
818 }
819
820 let (table_ids, node_actors) = self.collect_base_info();
821
822 self.database_info
824 .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
825
826 let mutation = Some(Command::drop_streaming_jobs_to_mutation(
827 &actors,
828 &dropped_sink_fragment_by_targets,
829 ));
830 (
831 mutation,
832 table_ids,
833 None,
834 node_actors,
835 PostCollectCommand::DropStreamingJobs {
836 streaming_job_ids,
837 unregistered_state_table_ids,
838 },
839 )
840 }
841
842 Some(Command::RescheduleIntent {
843 reschedule_plan, ..
844 }) => {
845 let ReschedulePlan {
846 reschedules,
847 fragment_actors,
848 } = reschedule_plan
849 .as_ref()
850 .expect("reschedule intent should be resolved in global barrier worker");
851
852 for (fragment_id, reschedule) in reschedules {
854 self.database_info.pre_apply_reschedule(
855 *fragment_id,
856 reschedule
857 .added_actors
858 .iter()
859 .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
860 actors.iter().map(|actor_id| {
861 (
862 *actor_id,
863 InflightActorInfo {
864 worker_id: *node_id,
865 vnode_bitmap: reschedule
866 .newly_created_actors
867 .get(actor_id)
868 .expect("should exist")
869 .0
870 .0
871 .vnode_bitmap
872 .clone(),
873 splits: reschedule
874 .actor_splits
875 .get(actor_id)
876 .cloned()
877 .unwrap_or_default(),
878 },
879 )
880 })
881 })
882 .collect(),
883 reschedule
884 .vnode_bitmap_updates
885 .iter()
886 .filter(|(actor_id, _)| {
887 !reschedule.newly_created_actors.contains_key(*actor_id)
888 })
889 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
890 .collect(),
891 reschedule.actor_splits.clone(),
892 );
893 }
894
895 let (table_ids, node_actors) = self.collect_base_info();
896
897 let actors_to_create = Some(Command::reschedule_actors_to_create(
899 reschedules,
900 fragment_actors,
901 &self.database_info,
902 partial_graph_manager.control_stream_manager(),
903 ));
904
905 self.database_info
907 .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
908 (
909 *fragment_id,
910 reschedule.removed_actors.iter().cloned().collect(),
911 )
912 }));
913
914 let mutation = Command::reschedule_to_mutation(
916 reschedules,
917 fragment_actors,
918 partial_graph_manager.control_stream_manager(),
919 &mut self.database_info,
920 )?;
921
922 let reschedules = reschedule_plan
923 .expect("reschedule intent should be resolved in global barrier worker")
924 .reschedules;
925 (
926 mutation,
927 table_ids,
928 actors_to_create,
929 node_actors,
930 PostCollectCommand::Reschedule { reschedules },
931 )
932 }
933
934 Some(Command::ReplaceStreamJob(plan)) => {
935 let ensembles = resolve_no_shuffle_ensembles(
936 &plan.new_fragments,
937 &plan.upstream_fragment_downstreams,
938 )?;
939 let mut render_result = render_actors(
940 &plan.new_fragments,
941 &self.database_info,
942 "", &plan.new_fragments.inner.ctx,
944 &plan.streaming_job_model,
945 partial_graph_manager
946 .control_stream_manager()
947 .env
948 .actor_id_generator(),
949 worker_nodes,
950 adaptive_parallelism_strategy,
951 &ensembles,
952 &plan.database_resource_group,
953 )?;
954
955 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
958 let actor_id_counter = partial_graph_manager
959 .control_stream_manager()
960 .env
961 .actor_id_generator();
962 for sink_ctx in sinks {
963 let original_fragment_id = sink_ctx.original_fragment.fragment_id;
964 let original_frag_info = self.database_info.fragment(original_fragment_id);
965 let actor_template = EnsembleActorTemplate::from_existing_inflight_fragment(
966 original_frag_info,
967 );
968 let new_aligner = ComponentFragmentAligner::new_persistent(
969 &actor_template,
970 actor_id_counter,
971 );
972 let distribution_type: DistributionType =
973 sink_ctx.new_fragment.distribution_type.into();
974 let actor_assignments =
975 new_aligner.align_component_actor(distribution_type);
976 let new_fragment_id = sink_ctx.new_fragment.fragment_id;
977 let mut actors = Vec::with_capacity(actor_assignments.len());
978 for (&actor_id, (worker_id, vnode_bitmap)) in &actor_assignments {
979 render_result.actor_location.insert(actor_id, *worker_id);
980 actors.push(StreamActor {
981 actor_id,
982 fragment_id: new_fragment_id,
983 vnode_bitmap: vnode_bitmap.clone(),
984 mview_definition: String::new(),
985 expr_context: Some(sink_ctx.ctx.to_expr_context()),
986 config_override: sink_ctx.ctx.config_override.clone(),
987 });
988 }
989 render_result.stream_actors.insert(new_fragment_id, actors);
990 }
991 }
992
993 let mut edges = self.database_info.build_edge(
995 None,
996 Some(&plan),
997 None,
998 partial_graph_manager.control_stream_manager(),
999 &render_result.stream_actors,
1000 &render_result.actor_location,
1001 );
1002
1003 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
1005 .stream_actors
1006 .iter()
1007 .map(|(fragment_id, actors)| {
1008 (
1009 *fragment_id,
1010 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
1011 )
1012 })
1013 .collect();
1014 let resolved_split_assignment = match &plan.split_plan {
1015 ReplaceJobSplitPlan::Discovered(discovered) => {
1016 SourceManager::resolve_fragment_to_actor_splits(
1017 &plan.new_fragments,
1018 discovered,
1019 &fragment_actor_ids,
1020 )?
1021 }
1022 ReplaceJobSplitPlan::AlignFromPrevious => {
1023 SourceManager::resolve_replace_source_splits(
1024 &plan.new_fragments,
1025 &plan.replace_upstream,
1026 edges.actor_new_no_shuffle(),
1027 |_fragment_id, actor_id| {
1028 self.database_info.fragment_infos().find_map(|fragment| {
1029 fragment
1030 .actors
1031 .get(&actor_id)
1032 .map(|info| info.splits.clone())
1033 })
1034 },
1035 )?
1036 }
1037 };
1038
1039 self.database_info.pre_apply_new_fragments(
1041 plan.new_fragments
1042 .new_fragment_info(
1043 &render_result.stream_actors,
1044 &render_result.actor_location,
1045 &resolved_split_assignment,
1046 )
1047 .map(|(fragment_id, new_fragment)| {
1048 (fragment_id, plan.streaming_job.id(), new_fragment)
1049 }),
1050 );
1051 for (fragment_id, replace_map) in &plan.replace_upstream {
1052 self.database_info
1053 .pre_apply_replace_node_upstream(*fragment_id, replace_map);
1054 }
1055 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1056 self.database_info
1057 .pre_apply_new_fragments(sinks.iter().map(|sink| {
1058 (
1059 sink.new_fragment.fragment_id,
1060 sink.original_sink.id.as_job_id(),
1061 sink.new_fragment_info(
1062 &render_result.stream_actors,
1063 &render_result.actor_location,
1064 ),
1065 )
1066 }));
1067 }
1068
1069 let (table_ids, node_actors) = self.collect_base_info();
1070
1071 let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
1073 &plan,
1074 &mut edges,
1075 &self.database_info,
1076 &render_result.stream_actors,
1077 &render_result.actor_location,
1078 ));
1079
1080 let mutation = Command::replace_stream_job_to_mutation(
1083 &plan,
1084 &mut edges,
1085 &mut self.database_info,
1086 &resolved_split_assignment,
1087 )?;
1088
1089 {
1091 let mut fragment_ids_to_remove: Vec<_> = plan
1092 .old_fragments
1093 .fragments
1094 .values()
1095 .map(|f| f.fragment_id)
1096 .collect();
1097 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1098 fragment_ids_to_remove
1099 .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
1100 }
1101 self.database_info
1102 .post_apply_remove_fragments(fragment_ids_to_remove);
1103 }
1104
1105 (
1106 mutation,
1107 table_ids,
1108 actors_to_create,
1109 node_actors,
1110 PostCollectCommand::ReplaceStreamJob {
1111 plan,
1112 resolved_split_assignment,
1113 },
1114 )
1115 }
1116
1117 Some(Command::SourceChangeSplit(split_state)) => {
1118 self.database_info.pre_apply_split_assignments(
1120 split_state
1121 .split_assignment
1122 .iter()
1123 .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
1124 );
1125
1126 let mutation = Some(Command::source_change_split_to_mutation(
1127 &split_state.split_assignment,
1128 ));
1129 let (table_ids, node_actors) = self.collect_base_info();
1130 (
1131 mutation,
1132 table_ids,
1133 None,
1134 node_actors,
1135 PostCollectCommand::SourceChangeSplit {
1136 split_assignment: split_state.split_assignment,
1137 },
1138 )
1139 }
1140
1141 Some(Command::CreateSubscription {
1142 subscription_id,
1143 upstream_mv_table_id,
1144 retention_second,
1145 }) => {
1146 self.database_info.register_subscriber(
1147 upstream_mv_table_id.as_job_id(),
1148 subscription_id.as_subscriber_id(),
1149 SubscriberType::Subscription(retention_second),
1150 );
1151 let mutation = Some(Command::create_subscription_to_mutation(
1152 upstream_mv_table_id,
1153 subscription_id,
1154 ));
1155 let (table_ids, node_actors) = self.collect_base_info();
1156 (
1157 mutation,
1158 table_ids,
1159 None,
1160 node_actors,
1161 PostCollectCommand::CreateSubscription { subscription_id },
1162 )
1163 }
1164
1165 Some(Command::DropSubscription {
1166 subscription_id,
1167 upstream_mv_table_id,
1168 }) => {
1169 if self
1170 .database_info
1171 .unregister_subscriber(
1172 upstream_mv_table_id.as_job_id(),
1173 subscription_id.as_subscriber_id(),
1174 )
1175 .is_none()
1176 {
1177 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
1178 }
1179 let mutation = Some(Command::drop_subscription_to_mutation(
1180 upstream_mv_table_id,
1181 subscription_id,
1182 ));
1183 let (table_ids, node_actors) = self.collect_base_info();
1184 (
1185 mutation,
1186 table_ids,
1187 None,
1188 node_actors,
1189 PostCollectCommand::Command("DropSubscription".to_owned()),
1190 )
1191 }
1192
1193 Some(Command::AlterSubscriptionRetention {
1194 subscription_id,
1195 upstream_mv_table_id,
1196 retention_second,
1197 }) => {
1198 self.database_info.update_subscription_retention(
1199 upstream_mv_table_id.as_job_id(),
1200 subscription_id.as_subscriber_id(),
1201 retention_second,
1202 );
1203 self.apply_simple_command(None, "AlterSubscriptionRetention")
1204 }
1205
1206 Some(Command::ConnectorPropsChange(config)) => {
1207 let mutation = Some(Command::connector_props_change_to_mutation(&config));
1208 let (table_ids, node_actors) = self.collect_base_info();
1209 (
1210 mutation,
1211 table_ids,
1212 None,
1213 node_actors,
1214 PostCollectCommand::ConnectorPropsChange(config),
1215 )
1216 }
1217
1218 Some(Command::Refresh {
1219 table_id,
1220 associated_source_id,
1221 }) => {
1222 let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
1223 self.apply_simple_command(mutation, "Refresh")
1224 }
1225
1226 Some(Command::ListFinish {
1227 table_id: _,
1228 associated_source_id,
1229 }) => {
1230 let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
1231 self.apply_simple_command(mutation, "ListFinish")
1232 }
1233
1234 Some(Command::LoadFinish {
1235 table_id: _,
1236 associated_source_id,
1237 }) => {
1238 let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
1239 self.apply_simple_command(mutation, "LoadFinish")
1240 }
1241
1242 Some(Command::ResetSource { source_id }) => {
1243 let mutation = Some(Command::reset_source_to_mutation(source_id));
1244 self.apply_simple_command(mutation, "ResetSource")
1245 }
1246
1247 Some(Command::ResumeBackfill { target }) => {
1248 let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
1249 let (table_ids, node_actors) = self.collect_base_info();
1250 (
1251 mutation,
1252 table_ids,
1253 None,
1254 node_actors,
1255 PostCollectCommand::ResumeBackfill { target },
1256 )
1257 }
1258
1259 Some(Command::InjectSourceOffsets {
1260 source_id,
1261 split_offsets,
1262 }) => {
1263 let mutation = Some(Command::inject_source_offsets_to_mutation(
1264 source_id,
1265 &split_offsets,
1266 ));
1267 self.apply_simple_command(mutation, "InjectSourceOffsets")
1268 }
1269 };
1270
1271 let mut finished_snapshot_backfill_jobs = HashSet::new();
1272 let mutation = match mutation {
1273 Some(mutation) => Some(mutation),
1274 None => {
1275 let mut finished_snapshot_backfill_job_info = HashMap::new();
1276 if barrier_info.kind.is_checkpoint() {
1277 for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
1278 if creating_job.should_merge_to_upstream() {
1279 let info = creating_job
1280 .start_consume_upstream(partial_graph_manager, barrier_info)?;
1281 finished_snapshot_backfill_job_info
1282 .try_insert(job_id, info)
1283 .expect("non-duplicated");
1284 }
1285 }
1286 }
1287
1288 if !finished_snapshot_backfill_job_info.is_empty() {
1289 let actors_to_create = actors_to_create.get_or_insert_default();
1290 let mut subscriptions_to_drop = vec![];
1291 let mut dispatcher_update = vec![];
1292 let mut actor_splits = HashMap::new();
1293 for (job_id, info) in finished_snapshot_backfill_job_info {
1294 finished_snapshot_backfill_jobs.insert(job_id);
1295 subscriptions_to_drop.extend(
1296 info.snapshot_backfill_upstream_tables.iter().map(
1297 |upstream_table_id| PbSubscriptionUpstreamInfo {
1298 subscriber_id: job_id.as_subscriber_id(),
1299 upstream_mv_table_id: *upstream_table_id,
1300 },
1301 ),
1302 );
1303 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
1304 assert_matches!(
1305 self.database_info.unregister_subscriber(
1306 upstream_mv_table_id.as_job_id(),
1307 job_id.as_subscriber_id()
1308 ),
1309 Some(SubscriberType::SnapshotBackfill)
1310 );
1311 }
1312
1313 table_ids_to_commit.extend(
1314 info.fragment_infos
1315 .values()
1316 .flat_map(|fragment| fragment.state_table_ids.iter())
1317 .copied(),
1318 );
1319
1320 let actor_len = info
1321 .fragment_infos
1322 .values()
1323 .map(|fragment| fragment.actors.len() as u64)
1324 .sum();
1325 let id_gen = GlobalActorIdGen::new(
1326 partial_graph_manager
1327 .control_stream_manager()
1328 .env
1329 .actor_id_generator(),
1330 actor_len,
1331 );
1332 let mut next_local_actor_id = 0;
1333 let actor_mapping: HashMap<_, _> = info
1335 .fragment_infos
1336 .values()
1337 .flat_map(|fragment| fragment.actors.keys())
1338 .map(|old_actor_id| {
1339 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
1340 next_local_actor_id += 1;
1341 (*old_actor_id, new_actor_id.as_global_id())
1342 })
1343 .collect();
1344 let actor_mapping = &actor_mapping;
1345 let new_stream_actors: HashMap<_, _> = info
1346 .stream_actors
1347 .into_iter()
1348 .map(|(old_actor_id, mut actor)| {
1349 let new_actor_id = actor_mapping[&old_actor_id];
1350 actor.actor_id = new_actor_id;
1351 (new_actor_id, actor)
1352 })
1353 .collect();
1354 let new_fragment_info: HashMap<_, _> = info
1355 .fragment_infos
1356 .into_iter()
1357 .map(|(fragment_id, mut fragment)| {
1358 let actors = take(&mut fragment.actors);
1359 fragment.actors = actors
1360 .into_iter()
1361 .map(|(old_actor_id, actor)| {
1362 let new_actor_id = actor_mapping[&old_actor_id];
1363 (new_actor_id, actor)
1364 })
1365 .collect();
1366 (fragment_id, fragment)
1367 })
1368 .collect();
1369 actor_splits.extend(
1370 new_fragment_info
1371 .values()
1372 .flat_map(|fragment| &fragment.actors)
1373 .map(|(actor_id, actor)| {
1374 (
1375 *actor_id,
1376 ConnectorSplits {
1377 splits: actor
1378 .splits
1379 .iter()
1380 .map(ConnectorSplit::from)
1381 .collect(),
1382 },
1383 )
1384 }),
1385 );
1386 let partial_graph_id = to_partial_graph_id(self.database_id, None);
1388 let mut edge_builder = FragmentEdgeBuilder::new(
1389 info.upstream_fragment_downstreams
1390 .keys()
1391 .map(|upstream_fragment_id| {
1392 self.database_info.fragment(*upstream_fragment_id)
1393 })
1394 .chain(new_fragment_info.values())
1395 .map(|fragment| {
1396 (
1397 fragment.fragment_id,
1398 EdgeBuilderFragmentInfo::from_inflight(
1399 fragment,
1400 partial_graph_id,
1401 partial_graph_manager.control_stream_manager(),
1402 ),
1403 )
1404 }),
1405 );
1406 edge_builder.add_relations(&info.upstream_fragment_downstreams);
1407 edge_builder.add_relations(&info.downstreams);
1408 let mut edges = edge_builder.build();
1409 let new_actors_to_create = edges.collect_actors_to_create(
1410 new_fragment_info.values().map(|fragment| {
1411 (
1412 fragment.fragment_id,
1413 &fragment.nodes,
1414 fragment.actors.iter().map(|(actor_id, actor)| {
1415 (&new_stream_actors[actor_id], actor.worker_id)
1416 }),
1417 [], )
1419 }),
1420 );
1421 dispatcher_update.extend(
1422 info.upstream_fragment_downstreams.keys().flat_map(
1423 |upstream_fragment_id| {
1424 let new_actor_dispatchers = edges
1425 .dispatchers
1426 .remove(upstream_fragment_id)
1427 .expect("should exist");
1428 new_actor_dispatchers.into_iter().flat_map(
1429 |(upstream_actor_id, dispatchers)| {
1430 dispatchers.into_iter().map(move |dispatcher| {
1431 PbDispatcherUpdate {
1432 actor_id: upstream_actor_id,
1433 dispatcher_id: dispatcher.dispatcher_id,
1434 hash_mapping: dispatcher.hash_mapping,
1435 removed_downstream_actor_id: dispatcher
1436 .downstream_actor_id
1437 .iter()
1438 .map(|new_downstream_actor_id| {
1439 actor_mapping
1440 .iter()
1441 .find_map(
1442 |(old_actor_id, new_actor_id)| {
1443 (new_downstream_actor_id
1444 == new_actor_id)
1445 .then_some(*old_actor_id)
1446 },
1447 )
1448 .expect("should exist")
1449 })
1450 .collect(),
1451 added_downstream_actor_id: dispatcher
1452 .downstream_actor_id,
1453 }
1454 })
1455 },
1456 )
1457 },
1458 ),
1459 );
1460 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1461 for (worker_id, worker_actors) in new_actors_to_create {
1462 node_actors.entry(worker_id).or_default().extend(
1463 worker_actors.values().flat_map(|(_, actors, _)| {
1464 actors.iter().map(|(actor, _, _)| actor.actor_id)
1465 }),
1466 );
1467 actors_to_create
1468 .entry(worker_id)
1469 .or_default()
1470 .extend(worker_actors);
1471 }
1472 self.database_info.add_existing(InflightStreamingJobInfo {
1473 job_id,
1474 fragment_infos: new_fragment_info,
1475 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
1477 cdc_table_backfill_tracker: None, });
1479 }
1480
1481 Some(PbMutation::Update(PbUpdateMutation {
1482 dispatcher_update,
1483 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
1487 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
1491 }))
1492 } else {
1493 let fragment_ids = self.database_info.take_pending_backfill_nodes();
1494 if fragment_ids.is_empty() {
1495 None
1496 } else {
1497 Some(PbMutation::StartFragmentBackfill(
1498 PbStartFragmentBackfillMutation { fragment_ids },
1499 ))
1500 }
1501 }
1502 }
1503 };
1504
1505 for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
1507 if !finished_snapshot_backfill_jobs.contains(job_id) {
1508 let throttle_mutation = if let Some((ref jobs, ref config)) =
1509 throttle_for_creating_jobs
1510 && jobs.contains(job_id)
1511 {
1512 assert_eq!(
1513 jobs.len(),
1514 1,
1515 "should not alter rate limit of snapshot backfill job with other jobs"
1516 );
1517 Some((
1518 Mutation::Throttle(ThrottleMutation {
1519 fragment_throttle: config
1520 .iter()
1521 .map(|(fragment_id, config)| (*fragment_id, *config))
1522 .collect(),
1523 }),
1524 take(notifiers),
1525 ))
1526 } else {
1527 None
1528 };
1529 creating_job.on_new_upstream_barrier(
1530 partial_graph_manager,
1531 barrier_info,
1532 throttle_mutation,
1533 )?;
1534 }
1535 }
1536
1537 partial_graph_manager.inject_barrier(
1538 to_partial_graph_id(self.database_id, None),
1539 mutation,
1540 barrier_info,
1541 &node_actors,
1542 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1543 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1544 actors_to_create,
1545 )?;
1546
1547 Ok(ApplyCommandInfo {
1548 mv_subscription_max_retention: self.database_info.max_subscription_retention(),
1549 table_ids_to_commit,
1550 jobs_to_wait: finished_snapshot_backfill_jobs,
1551 command: post_collect_command,
1552 })
1553 }
1554}