1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::sync::atomic::AtomicU32;
20
21use risingwave_common::bail;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::TableId;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
33use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
34use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
35use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
36use risingwave_pb::stream_plan::{
37 PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
38 PbUpstreamSinkInfo, ThrottleMutation,
39};
40use tracing::warn;
41
42use crate::MetaResult;
43use crate::barrier::cdc_progress::CdcTableBackfillTracker;
44use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
45use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
46use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
47use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
48use crate::barrier::info::{
49 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
50 SubscriberType,
51};
52use crate::barrier::notifier::Notifier;
53use crate::barrier::partial_graph::{PartialGraphBarrierInfo, PartialGraphManager};
54use crate::barrier::rpc::to_partial_graph_id;
55use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
56use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
57use crate::controller::scale::{
58 ComponentFragmentAligner, EnsembleActorTemplate, NoShuffleEnsemble,
59 build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
60};
61use crate::model::{
62 ActorId, ActorNewNoShuffle, FragmentDownstreamRelation, FragmentId, StreamActor, StreamContext,
63 StreamJobActorsToCreate, StreamJobFragmentsToCreate,
64};
65use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
66use crate::stream::{
67 GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
68 fill_snapshot_backfill_epoch,
69};
70
71pub(in crate::barrier) struct BarrierWorkerState {
73 in_flight_prev_epoch: TracedEpoch,
78
79 pending_non_checkpoint_barriers: Vec<u64>,
81
82 is_paused: bool,
84}
85
86impl BarrierWorkerState {
87 pub(super) fn new() -> Self {
88 Self {
89 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
90 pending_non_checkpoint_barriers: vec![],
91 is_paused: false,
92 }
93 }
94
95 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
96 Self {
97 in_flight_prev_epoch,
98 pending_non_checkpoint_barriers: vec![],
99 is_paused,
100 }
101 }
102
103 pub fn is_paused(&self) -> bool {
104 self.is_paused
105 }
106
107 fn set_is_paused(&mut self, is_paused: bool) {
108 if self.is_paused != is_paused {
109 tracing::info!(
110 currently_paused = self.is_paused,
111 newly_paused = is_paused,
112 "update paused state"
113 );
114 self.is_paused = is_paused;
115 }
116 }
117
118 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
119 &self.in_flight_prev_epoch
120 }
121
122 pub fn next_barrier_info(
124 &mut self,
125 is_checkpoint: bool,
126 curr_epoch: TracedEpoch,
127 ) -> BarrierInfo {
128 assert!(
129 self.in_flight_prev_epoch.value() < curr_epoch.value(),
130 "curr epoch regress. {} > {}",
131 self.in_flight_prev_epoch.value(),
132 curr_epoch.value()
133 );
134 let prev_epoch = self.in_flight_prev_epoch.clone();
135 self.in_flight_prev_epoch = curr_epoch.clone();
136 self.pending_non_checkpoint_barriers
137 .push(prev_epoch.value().0);
138 let kind = if is_checkpoint {
139 let epochs = take(&mut self.pending_non_checkpoint_barriers);
140 BarrierKind::Checkpoint(epochs)
141 } else {
142 BarrierKind::Barrier
143 };
144 BarrierInfo {
145 prev_epoch,
146 curr_epoch,
147 kind,
148 }
149 }
150}
151
152pub(super) struct ApplyCommandInfo {
153 pub jobs_to_wait: HashSet<JobId>,
154}
155
156type ApplyCommandResult = (
159 Option<Mutation>,
160 HashSet<TableId>,
161 Option<StreamJobActorsToCreate>,
162 HashMap<WorkerId, HashSet<ActorId>>,
163 PostCollectCommand,
164);
165
166pub(super) struct RenderResult {
168 pub stream_actors: HashMap<FragmentId, Vec<StreamActor>>,
170 pub actor_location: HashMap<ActorId, WorkerId>,
172}
173
174fn resolve_no_shuffle_ensembles(
183 fragments: &StreamJobFragmentsToCreate,
184 upstream_fragment_downstreams: &FragmentDownstreamRelation,
185) -> MetaResult<Vec<NoShuffleEnsemble>> {
186 let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
188
189 for (upstream_fid, relations) in &fragments.downstreams {
191 for rel in relations {
192 if rel.dispatcher_type == DispatcherType::NoShuffle {
193 new_no_shuffle
194 .entry(*upstream_fid)
195 .or_default()
196 .insert(rel.downstream_fragment_id);
197 }
198 }
199 }
200
201 for (upstream_fid, relations) in upstream_fragment_downstreams {
203 for rel in relations {
204 if rel.dispatcher_type == DispatcherType::NoShuffle {
205 new_no_shuffle
206 .entry(*upstream_fid)
207 .or_default()
208 .insert(rel.downstream_fragment_id);
209 }
210 }
211 }
212
213 let mut ensembles = if new_no_shuffle.is_empty() {
214 Vec::new()
215 } else {
216 let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
218 .iter()
219 .flat_map(|(upstream_fid, downstream_fids)| {
220 downstream_fids
221 .iter()
222 .map(move |downstream_fid| (*upstream_fid, *downstream_fid))
223 })
224 .collect();
225
226 let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
227 .iter()
228 .flat_map(|(u, d)| [*u, *d])
229 .collect::<HashSet<_>>()
230 .into_iter()
231 .collect();
232
233 let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
234 find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
235 };
236
237 let covered: HashSet<FragmentId> = ensembles
239 .iter()
240 .flat_map(|e| e.component_fragments())
241 .collect();
242 for fragment_id in fragments.inner.fragments.keys() {
243 if !covered.contains(fragment_id) {
244 ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
245 }
246 }
247
248 Ok(ensembles)
249}
250
251fn render_actors(
262 fragments: &StreamJobFragmentsToCreate,
263 database_info: &InflightDatabaseInfo,
264 definition: &str,
265 ctx: &StreamContext,
266 streaming_job_model: &streaming_job::Model,
267 actor_id_counter: &AtomicU32,
268 worker_map: &HashMap<WorkerId, WorkerNode>,
269 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
270 ensembles: &[NoShuffleEnsemble],
271 database_resource_group: &str,
272) -> MetaResult<RenderResult> {
273 let mut actor_assignments: HashMap<FragmentId, HashMap<ActorId, (WorkerId, Option<Bitmap>)>> =
276 HashMap::new();
277
278 for ensemble in ensembles {
279 let existing_fragment_ids: Vec<FragmentId> = ensemble
284 .component_fragments()
285 .filter(|fragment_id| !fragments.inner.fragments.contains_key(fragment_id))
286 .collect();
287
288 let actor_template = if let Some(&first_existing) = existing_fragment_ids.first() {
289 let template = EnsembleActorTemplate::from_existing_inflight_fragment(
290 database_info.fragment(first_existing),
291 );
292
293 for &other_fragment_id in &existing_fragment_ids[1..] {
296 let other = EnsembleActorTemplate::from_existing_inflight_fragment(
297 database_info.fragment(other_fragment_id),
298 );
299 template.assert_aligned_with(&other, first_existing, other_fragment_id);
300 }
301
302 template
303 } else {
304 let first_component = ensemble
306 .component_fragments()
307 .next()
308 .expect("ensemble must have at least one component");
309 let fragment = &fragments.inner.fragments[&first_component];
310 let distribution_type: DistributionType = fragment.distribution_type.into();
311 let vnode_count = fragment.vnode_count();
312
313 for fragment_id in ensemble.component_fragments() {
315 let f = &fragments.inner.fragments[&fragment_id];
316 assert_eq!(
317 vnode_count,
318 f.vnode_count(),
319 "component fragments {} and {} in the same no-shuffle ensemble have \
320 different vnode counts: {} vs {}",
321 first_component,
322 fragment_id,
323 vnode_count,
324 f.vnode_count(),
325 );
326 }
327
328 EnsembleActorTemplate::render_new(
329 streaming_job_model,
330 worker_map,
331 adaptive_parallelism_strategy,
332 None,
333 database_resource_group.to_owned(),
334 distribution_type,
335 vnode_count,
336 )?
337 };
338
339 for fragment_id in ensemble.component_fragments() {
341 if !fragments.inner.fragments.contains_key(&fragment_id) {
342 continue; }
344 let fragment = &fragments.inner.fragments[&fragment_id];
345 let distribution_type: DistributionType = fragment.distribution_type.into();
346 let aligner =
347 ComponentFragmentAligner::new_persistent(&actor_template, actor_id_counter);
348 let assignments = aligner.align_component_actor(distribution_type);
349 actor_assignments.insert(fragment_id, assignments);
350 }
351 }
352
353 let mut result_stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
355 let mut result_actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
356
357 for (fragment_id, assignments) in &actor_assignments {
358 let mut actors = Vec::with_capacity(assignments.len());
359 for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
360 result_actor_location.insert(actor_id, *worker_id);
361 actors.push(StreamActor {
362 actor_id,
363 fragment_id: *fragment_id,
364 vnode_bitmap: vnode_bitmap.clone(),
365 mview_definition: definition.to_owned(),
366 expr_context: Some(ctx.to_expr_context()),
367 config_override: ctx.config_override.clone(),
368 });
369 }
370 result_stream_actors.insert(*fragment_id, actors);
371 }
372
373 Ok(RenderResult {
374 stream_actors: result_stream_actors,
375 actor_location: result_actor_location,
376 })
377}
378impl DatabaseCheckpointControl {
379 fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
381 let table_ids_to_commit = self.database_info.existing_table_ids().collect();
382 let node_actors =
383 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
384 (table_ids_to_commit, node_actors)
385 }
386
387 fn apply_simple_command(
391 &self,
392 mutation: Option<Mutation>,
393 command_name: &'static str,
394 ) -> ApplyCommandResult {
395 let (table_ids, node_actors) = self.collect_base_info();
396 (
397 mutation,
398 table_ids,
399 None,
400 node_actors,
401 PostCollectCommand::Command(command_name.to_owned()),
402 )
403 }
404
405 pub(super) fn apply_command(
408 &mut self,
409 command: Option<Command>,
410 notifiers: &mut Vec<Notifier>,
411 barrier_info: BarrierInfo,
412 partial_graph_manager: &mut PartialGraphManager,
413 hummock_version_stats: &HummockVersionStats,
414 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
415 worker_nodes: &HashMap<WorkerId, WorkerNode>,
416 ) -> MetaResult<ApplyCommandInfo> {
417 debug_assert!(
418 !matches!(
419 command,
420 Some(Command::RescheduleIntent {
421 reschedule_plan: None,
422 ..
423 })
424 ),
425 "reschedule intent must be resolved before apply"
426 );
427 if matches!(
428 command,
429 Some(Command::RescheduleIntent {
430 reschedule_plan: None,
431 ..
432 })
433 ) {
434 bail!("reschedule intent must be resolved before apply");
435 }
436
437 fn resolve_source_splits(
442 info: &CreateStreamingJobCommandInfo,
443 render_result: &RenderResult,
444 actor_no_shuffle: &ActorNewNoShuffle,
445 database_info: &InflightDatabaseInfo,
446 ) -> MetaResult<SplitAssignment> {
447 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
448 .stream_actors
449 .iter()
450 .map(|(fragment_id, actors)| {
451 (
452 *fragment_id,
453 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
454 )
455 })
456 .collect();
457 let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
458 &info.stream_job_fragments,
459 &info.init_split_assignment,
460 &fragment_actor_ids,
461 )?;
462 resolved.extend(SourceManager::resolve_backfill_splits(
463 &info.stream_job_fragments,
464 actor_no_shuffle,
465 |fragment_id, actor_id| {
466 database_info
467 .fragment(fragment_id)
468 .actors
469 .get(&actor_id)
470 .map(|info| info.splits.clone())
471 },
472 )?);
473 Ok(resolved)
474 }
475
476 let mut throttle_for_creating_jobs: Option<(
478 HashSet<JobId>,
479 HashMap<FragmentId, ThrottleConfig>,
480 )> = None;
481
482 let (
486 mutation,
487 mut table_ids_to_commit,
488 mut actors_to_create,
489 mut node_actors,
490 post_collect_command,
491 ) = match command {
492 None => self.apply_simple_command(None, "barrier"),
493 Some(Command::CreateStreamingJob {
494 mut info,
495 job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
496 cross_db_snapshot_backfill_info,
497 }) => {
498 let ensembles = resolve_no_shuffle_ensembles(
499 &info.stream_job_fragments,
500 &info.upstream_fragment_downstreams,
501 )?;
502 let actors = render_actors(
503 &info.stream_job_fragments,
504 &self.database_info,
505 &info.definition,
506 &info.stream_job_fragments.inner.ctx,
507 &info.streaming_job_model,
508 partial_graph_manager
509 .control_stream_manager()
510 .env
511 .actor_id_generator(),
512 worker_nodes,
513 adaptive_parallelism_strategy,
514 &ensembles,
515 &info.database_resource_group,
516 )?;
517 {
518 assert!(!self.state.is_paused());
519 let snapshot_epoch = barrier_info.prev_epoch();
520 for snapshot_backfill_epoch in snapshot_backfill_info
522 .upstream_mv_table_id_to_backfill_epoch
523 .values_mut()
524 {
525 assert_eq!(
526 snapshot_backfill_epoch.replace(snapshot_epoch),
527 None,
528 "must not set previously"
529 );
530 }
531 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
532 fill_snapshot_backfill_epoch(
533 &mut fragment.nodes,
534 Some(&snapshot_backfill_info),
535 &cross_db_snapshot_backfill_info,
536 )?;
537 }
538 let job_id = info.stream_job_fragments.stream_job_id();
539 let snapshot_backfill_upstream_tables = snapshot_backfill_info
540 .upstream_mv_table_id_to_backfill_epoch
541 .keys()
542 .cloned()
543 .collect();
544
545 let mut edges = self.database_info.build_edge(
547 Some((&info, true)),
548 None,
549 None,
550 partial_graph_manager.control_stream_manager(),
551 &actors.stream_actors,
552 &actors.actor_location,
553 );
554 let resolved_split_assignment = resolve_source_splits(
556 &info,
557 &actors,
558 edges.actor_new_no_shuffle(),
559 &self.database_info,
560 )?;
561
562 let Entry::Vacant(entry) = self.creating_streaming_job_controls.entry(job_id)
563 else {
564 panic!("duplicated creating snapshot backfill job {job_id}");
565 };
566
567 let job = CreatingStreamingJobControl::new(
568 entry,
569 CreateSnapshotBackfillJobCommandInfo {
570 info: info.clone(),
571 snapshot_backfill_info: snapshot_backfill_info.clone(),
572 cross_db_snapshot_backfill_info,
573 resolved_split_assignment: resolved_split_assignment.clone(),
574 },
575 take(notifiers),
576 snapshot_backfill_upstream_tables,
577 snapshot_epoch,
578 hummock_version_stats,
579 partial_graph_manager,
580 &mut edges,
581 &resolved_split_assignment,
582 &actors,
583 )?;
584
585 self.database_info
586 .shared_actor_infos
587 .upsert(self.database_id, job.fragment_infos_with_job_id());
588
589 for upstream_mv_table_id in snapshot_backfill_info
590 .upstream_mv_table_id_to_backfill_epoch
591 .keys()
592 {
593 self.database_info.register_subscriber(
594 upstream_mv_table_id.as_job_id(),
595 info.streaming_job.id().as_subscriber_id(),
596 SubscriberType::SnapshotBackfill,
597 );
598 }
599
600 let mutation = Command::create_streaming_job_to_mutation(
601 &info,
602 &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
603 self.state.is_paused(),
604 &mut edges,
605 partial_graph_manager.control_stream_manager(),
606 None,
607 &resolved_split_assignment,
608 &actors.stream_actors,
609 &actors.actor_location,
610 )?;
611
612 let (table_ids, node_actors) = self.collect_base_info();
613 (
614 Some(mutation),
615 table_ids,
616 None,
617 node_actors,
618 PostCollectCommand::barrier(),
619 )
620 }
621 }
622 Some(Command::CreateStreamingJob {
623 mut info,
624 job_type,
625 cross_db_snapshot_backfill_info,
626 }) => {
627 let ensembles = resolve_no_shuffle_ensembles(
628 &info.stream_job_fragments,
629 &info.upstream_fragment_downstreams,
630 )?;
631 let actors = render_actors(
632 &info.stream_job_fragments,
633 &self.database_info,
634 &info.definition,
635 &info.stream_job_fragments.inner.ctx,
636 &info.streaming_job_model,
637 partial_graph_manager
638 .control_stream_manager()
639 .env
640 .actor_id_generator(),
641 worker_nodes,
642 adaptive_parallelism_strategy,
643 &ensembles,
644 &info.database_resource_group,
645 )?;
646 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
647 fill_snapshot_backfill_epoch(
648 &mut fragment.nodes,
649 None,
650 &cross_db_snapshot_backfill_info,
651 )?;
652 }
653
654 let new_upstream_sink =
656 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
657 Some(ctx)
658 } else {
659 None
660 };
661
662 let mut edges = self.database_info.build_edge(
663 Some((&info, false)),
664 None,
665 new_upstream_sink,
666 partial_graph_manager.control_stream_manager(),
667 &actors.stream_actors,
668 &actors.actor_location,
669 );
670 let resolved_split_assignment = resolve_source_splits(
672 &info,
673 &actors,
674 edges.actor_new_no_shuffle(),
675 &self.database_info,
676 )?;
677
678 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
680 let (fragment, _) =
681 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
682 .expect("should have parallel cdc fragment");
683 Some(CdcTableBackfillTracker::new(
684 fragment.fragment_id,
685 splits.clone(),
686 ))
687 } else {
688 None
689 };
690 self.database_info
691 .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
692 self.database_info.pre_apply_new_fragments(
693 info.stream_job_fragments
694 .new_fragment_info(
695 &actors.stream_actors,
696 &actors.actor_location,
697 &resolved_split_assignment,
698 )
699 .map(|(fragment_id, fragment_infos)| {
700 (fragment_id, info.streaming_job.id(), fragment_infos)
701 }),
702 );
703 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
704 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
705 self.database_info.pre_apply_add_node_upstream(
706 downstream_fragment_id,
707 &PbUpstreamSinkInfo {
708 upstream_fragment_id: ctx.sink_fragment_id,
709 sink_output_schema: ctx.sink_output_fields.clone(),
710 project_exprs: ctx.project_exprs.clone(),
711 },
712 );
713 }
714
715 let (table_ids, node_actors) = self.collect_base_info();
716
717 let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
719 &info,
720 &mut edges,
721 &actors.stream_actors,
722 &actors.actor_location,
723 ));
724
725 let actor_cdc_table_snapshot_splits = self
727 .database_info
728 .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
729
730 let is_currently_paused = self.state.is_paused();
732 let mutation = Command::create_streaming_job_to_mutation(
733 &info,
734 &job_type,
735 is_currently_paused,
736 &mut edges,
737 partial_graph_manager.control_stream_manager(),
738 actor_cdc_table_snapshot_splits,
739 &resolved_split_assignment,
740 &actors.stream_actors,
741 &actors.actor_location,
742 )?;
743
744 (
745 Some(mutation),
746 table_ids,
747 actors_to_create,
748 node_actors,
749 PostCollectCommand::CreateStreamingJob {
750 info,
751 job_type,
752 cross_db_snapshot_backfill_info,
753 resolved_split_assignment,
754 },
755 )
756 }
757
758 Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
759
760 Some(Command::Pause) => {
761 let prev_is_paused = self.state.is_paused();
762 self.state.set_is_paused(true);
763 let mutation = Command::pause_to_mutation(prev_is_paused);
764 let (table_ids, node_actors) = self.collect_base_info();
765 (
766 mutation,
767 table_ids,
768 None,
769 node_actors,
770 PostCollectCommand::Command("Pause".to_owned()),
771 )
772 }
773
774 Some(Command::Resume) => {
775 let prev_is_paused = self.state.is_paused();
776 self.state.set_is_paused(false);
777 let mutation = Command::resume_to_mutation(prev_is_paused);
778 let (table_ids, node_actors) = self.collect_base_info();
779 (
780 mutation,
781 table_ids,
782 None,
783 node_actors,
784 PostCollectCommand::Command("Resume".to_owned()),
785 )
786 }
787
788 Some(Command::Throttle { jobs, config }) => {
789 let mutation = Some(Command::throttle_to_mutation(&config));
790 throttle_for_creating_jobs = Some((jobs, config));
791 self.apply_simple_command(mutation, "Throttle")
792 }
793
794 Some(Command::DropStreamingJobs {
795 streaming_job_ids,
796 unregistered_state_table_ids,
797 unregistered_fragment_ids,
798 dropped_sink_fragment_by_targets,
799 }) => {
800 let actors = self
801 .database_info
802 .fragment_infos()
803 .filter(|fragment| {
804 self.database_info
805 .job_id_by_fragment(fragment.fragment_id)
806 .is_some_and(|job_id| streaming_job_ids.contains(&job_id))
807 })
808 .flat_map(|fragment| fragment.actors.keys().copied())
809 .collect::<Vec<_>>();
810
811 for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
813 self.database_info
814 .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
815 }
816
817 let (table_ids, node_actors) = self.collect_base_info();
818
819 self.database_info
821 .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
822
823 let mutation = Some(Command::drop_streaming_jobs_to_mutation(
824 &actors,
825 &dropped_sink_fragment_by_targets,
826 ));
827 (
828 mutation,
829 table_ids,
830 None,
831 node_actors,
832 PostCollectCommand::DropStreamingJobs {
833 streaming_job_ids,
834 unregistered_state_table_ids,
835 },
836 )
837 }
838
839 Some(Command::RescheduleIntent {
840 reschedule_plan, ..
841 }) => {
842 let ReschedulePlan {
843 reschedules,
844 fragment_actors,
845 } = reschedule_plan
846 .as_ref()
847 .expect("reschedule intent should be resolved in global barrier worker");
848
849 for (fragment_id, reschedule) in reschedules {
851 self.database_info.pre_apply_reschedule(
852 *fragment_id,
853 reschedule
854 .added_actors
855 .iter()
856 .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
857 actors.iter().map(|actor_id| {
858 (
859 *actor_id,
860 InflightActorInfo {
861 worker_id: *node_id,
862 vnode_bitmap: reschedule
863 .newly_created_actors
864 .get(actor_id)
865 .expect("should exist")
866 .0
867 .0
868 .vnode_bitmap
869 .clone(),
870 splits: reschedule
871 .actor_splits
872 .get(actor_id)
873 .cloned()
874 .unwrap_or_default(),
875 },
876 )
877 })
878 })
879 .collect(),
880 reschedule
881 .vnode_bitmap_updates
882 .iter()
883 .filter(|(actor_id, _)| {
884 !reschedule.newly_created_actors.contains_key(*actor_id)
885 })
886 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
887 .collect(),
888 reschedule.actor_splits.clone(),
889 );
890 }
891
892 let (table_ids, node_actors) = self.collect_base_info();
893
894 let actors_to_create = Some(Command::reschedule_actors_to_create(
896 reschedules,
897 fragment_actors,
898 &self.database_info,
899 partial_graph_manager.control_stream_manager(),
900 ));
901
902 self.database_info
904 .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
905 (
906 *fragment_id,
907 reschedule.removed_actors.iter().cloned().collect(),
908 )
909 }));
910
911 let mutation = Command::reschedule_to_mutation(
913 reschedules,
914 fragment_actors,
915 partial_graph_manager.control_stream_manager(),
916 &mut self.database_info,
917 )?;
918
919 let reschedules = reschedule_plan
920 .expect("reschedule intent should be resolved in global barrier worker")
921 .reschedules;
922 (
923 mutation,
924 table_ids,
925 actors_to_create,
926 node_actors,
927 PostCollectCommand::Reschedule { reschedules },
928 )
929 }
930
931 Some(Command::ReplaceStreamJob(plan)) => {
932 let ensembles = resolve_no_shuffle_ensembles(
933 &plan.new_fragments,
934 &plan.upstream_fragment_downstreams,
935 )?;
936 let mut render_result = render_actors(
937 &plan.new_fragments,
938 &self.database_info,
939 "", &plan.new_fragments.inner.ctx,
941 &plan.streaming_job_model,
942 partial_graph_manager
943 .control_stream_manager()
944 .env
945 .actor_id_generator(),
946 worker_nodes,
947 adaptive_parallelism_strategy,
948 &ensembles,
949 &plan.database_resource_group,
950 )?;
951
952 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
955 let actor_id_counter = partial_graph_manager
956 .control_stream_manager()
957 .env
958 .actor_id_generator();
959 for sink_ctx in sinks {
960 let original_fragment_id = sink_ctx.original_fragment.fragment_id;
961 let original_frag_info = self.database_info.fragment(original_fragment_id);
962 let actor_template = EnsembleActorTemplate::from_existing_inflight_fragment(
963 original_frag_info,
964 );
965 let new_aligner = ComponentFragmentAligner::new_persistent(
966 &actor_template,
967 actor_id_counter,
968 );
969 let distribution_type: DistributionType =
970 sink_ctx.new_fragment.distribution_type.into();
971 let actor_assignments =
972 new_aligner.align_component_actor(distribution_type);
973 let new_fragment_id = sink_ctx.new_fragment.fragment_id;
974 let mut actors = Vec::with_capacity(actor_assignments.len());
975 for (&actor_id, (worker_id, vnode_bitmap)) in &actor_assignments {
976 render_result.actor_location.insert(actor_id, *worker_id);
977 actors.push(StreamActor {
978 actor_id,
979 fragment_id: new_fragment_id,
980 vnode_bitmap: vnode_bitmap.clone(),
981 mview_definition: String::new(),
982 expr_context: Some(sink_ctx.ctx.to_expr_context()),
983 config_override: sink_ctx.ctx.config_override.clone(),
984 });
985 }
986 render_result.stream_actors.insert(new_fragment_id, actors);
987 }
988 }
989
990 let mut edges = self.database_info.build_edge(
992 None,
993 Some(&plan),
994 None,
995 partial_graph_manager.control_stream_manager(),
996 &render_result.stream_actors,
997 &render_result.actor_location,
998 );
999
1000 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
1002 .stream_actors
1003 .iter()
1004 .map(|(fragment_id, actors)| {
1005 (
1006 *fragment_id,
1007 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
1008 )
1009 })
1010 .collect();
1011 let resolved_split_assignment = match &plan.split_plan {
1012 ReplaceJobSplitPlan::Discovered(discovered) => {
1013 SourceManager::resolve_fragment_to_actor_splits(
1014 &plan.new_fragments,
1015 discovered,
1016 &fragment_actor_ids,
1017 )?
1018 }
1019 ReplaceJobSplitPlan::AlignFromPrevious => {
1020 SourceManager::resolve_replace_source_splits(
1021 &plan.new_fragments,
1022 &plan.replace_upstream,
1023 edges.actor_new_no_shuffle(),
1024 |_fragment_id, actor_id| {
1025 self.database_info.fragment_infos().find_map(|fragment| {
1026 fragment
1027 .actors
1028 .get(&actor_id)
1029 .map(|info| info.splits.clone())
1030 })
1031 },
1032 )?
1033 }
1034 };
1035
1036 self.database_info.pre_apply_new_fragments(
1038 plan.new_fragments
1039 .new_fragment_info(
1040 &render_result.stream_actors,
1041 &render_result.actor_location,
1042 &resolved_split_assignment,
1043 )
1044 .map(|(fragment_id, new_fragment)| {
1045 (fragment_id, plan.streaming_job.id(), new_fragment)
1046 }),
1047 );
1048 for (fragment_id, replace_map) in &plan.replace_upstream {
1049 self.database_info
1050 .pre_apply_replace_node_upstream(*fragment_id, replace_map);
1051 }
1052 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1053 self.database_info
1054 .pre_apply_new_fragments(sinks.iter().map(|sink| {
1055 (
1056 sink.new_fragment.fragment_id,
1057 sink.original_sink.id.as_job_id(),
1058 sink.new_fragment_info(
1059 &render_result.stream_actors,
1060 &render_result.actor_location,
1061 ),
1062 )
1063 }));
1064 }
1065
1066 let (table_ids, node_actors) = self.collect_base_info();
1067
1068 let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
1070 &plan,
1071 &mut edges,
1072 &self.database_info,
1073 &render_result.stream_actors,
1074 &render_result.actor_location,
1075 ));
1076
1077 let mutation = Command::replace_stream_job_to_mutation(
1080 &plan,
1081 &mut edges,
1082 &mut self.database_info,
1083 &resolved_split_assignment,
1084 )?;
1085
1086 {
1088 let mut fragment_ids_to_remove: Vec<_> = plan
1089 .old_fragments
1090 .fragments
1091 .values()
1092 .map(|f| f.fragment_id)
1093 .collect();
1094 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1095 fragment_ids_to_remove
1096 .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
1097 }
1098 self.database_info
1099 .post_apply_remove_fragments(fragment_ids_to_remove);
1100 }
1101
1102 (
1103 mutation,
1104 table_ids,
1105 actors_to_create,
1106 node_actors,
1107 PostCollectCommand::ReplaceStreamJob {
1108 plan,
1109 resolved_split_assignment,
1110 },
1111 )
1112 }
1113
1114 Some(Command::SourceChangeSplit(split_state)) => {
1115 self.database_info.pre_apply_split_assignments(
1117 split_state
1118 .split_assignment
1119 .iter()
1120 .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
1121 );
1122
1123 let mutation = Some(Command::source_change_split_to_mutation(
1124 &split_state.split_assignment,
1125 ));
1126 let (table_ids, node_actors) = self.collect_base_info();
1127 (
1128 mutation,
1129 table_ids,
1130 None,
1131 node_actors,
1132 PostCollectCommand::SourceChangeSplit {
1133 split_assignment: split_state.split_assignment,
1134 },
1135 )
1136 }
1137
1138 Some(Command::CreateSubscription {
1139 subscription_id,
1140 upstream_mv_table_id,
1141 retention_second,
1142 }) => {
1143 self.database_info.register_subscriber(
1144 upstream_mv_table_id.as_job_id(),
1145 subscription_id.as_subscriber_id(),
1146 SubscriberType::Subscription(retention_second),
1147 );
1148 let mutation = Some(Command::create_subscription_to_mutation(
1149 upstream_mv_table_id,
1150 subscription_id,
1151 ));
1152 let (table_ids, node_actors) = self.collect_base_info();
1153 (
1154 mutation,
1155 table_ids,
1156 None,
1157 node_actors,
1158 PostCollectCommand::CreateSubscription { subscription_id },
1159 )
1160 }
1161
1162 Some(Command::DropSubscription {
1163 subscription_id,
1164 upstream_mv_table_id,
1165 }) => {
1166 if self
1167 .database_info
1168 .unregister_subscriber(
1169 upstream_mv_table_id.as_job_id(),
1170 subscription_id.as_subscriber_id(),
1171 )
1172 .is_none()
1173 {
1174 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
1175 }
1176 let mutation = Some(Command::drop_subscription_to_mutation(
1177 upstream_mv_table_id,
1178 subscription_id,
1179 ));
1180 let (table_ids, node_actors) = self.collect_base_info();
1181 (
1182 mutation,
1183 table_ids,
1184 None,
1185 node_actors,
1186 PostCollectCommand::Command("DropSubscription".to_owned()),
1187 )
1188 }
1189
1190 Some(Command::AlterSubscriptionRetention {
1191 subscription_id,
1192 upstream_mv_table_id,
1193 retention_second,
1194 }) => {
1195 self.database_info.update_subscription_retention(
1196 upstream_mv_table_id.as_job_id(),
1197 subscription_id.as_subscriber_id(),
1198 retention_second,
1199 );
1200 self.apply_simple_command(None, "AlterSubscriptionRetention")
1201 }
1202
1203 Some(Command::ConnectorPropsChange(config)) => {
1204 let mutation = Some(Command::connector_props_change_to_mutation(&config));
1205 let (table_ids, node_actors) = self.collect_base_info();
1206 (
1207 mutation,
1208 table_ids,
1209 None,
1210 node_actors,
1211 PostCollectCommand::ConnectorPropsChange(config),
1212 )
1213 }
1214
1215 Some(Command::Refresh {
1216 table_id,
1217 associated_source_id,
1218 }) => {
1219 let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
1220 self.apply_simple_command(mutation, "Refresh")
1221 }
1222
1223 Some(Command::ListFinish {
1224 table_id: _,
1225 associated_source_id,
1226 }) => {
1227 let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
1228 self.apply_simple_command(mutation, "ListFinish")
1229 }
1230
1231 Some(Command::LoadFinish {
1232 table_id: _,
1233 associated_source_id,
1234 }) => {
1235 let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
1236 self.apply_simple_command(mutation, "LoadFinish")
1237 }
1238
1239 Some(Command::ResetSource { source_id }) => {
1240 let mutation = Some(Command::reset_source_to_mutation(source_id));
1241 self.apply_simple_command(mutation, "ResetSource")
1242 }
1243
1244 Some(Command::ResumeBackfill { target }) => {
1245 let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
1246 let (table_ids, node_actors) = self.collect_base_info();
1247 (
1248 mutation,
1249 table_ids,
1250 None,
1251 node_actors,
1252 PostCollectCommand::ResumeBackfill { target },
1253 )
1254 }
1255
1256 Some(Command::InjectSourceOffsets {
1257 source_id,
1258 split_offsets,
1259 }) => {
1260 let mutation = Some(Command::inject_source_offsets_to_mutation(
1261 source_id,
1262 &split_offsets,
1263 ));
1264 self.apply_simple_command(mutation, "InjectSourceOffsets")
1265 }
1266 };
1267
1268 let mut finished_snapshot_backfill_jobs = HashSet::new();
1269 let mutation = match mutation {
1270 Some(mutation) => Some(mutation),
1271 None => {
1272 let mut finished_snapshot_backfill_job_info = HashMap::new();
1273 if barrier_info.kind.is_checkpoint() {
1274 for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
1275 if creating_job.should_merge_to_upstream() {
1276 let info = creating_job
1277 .start_consume_upstream(partial_graph_manager, &barrier_info)?;
1278 finished_snapshot_backfill_job_info
1279 .try_insert(job_id, info)
1280 .expect("non-duplicated");
1281 }
1282 }
1283 }
1284
1285 if !finished_snapshot_backfill_job_info.is_empty() {
1286 let actors_to_create = actors_to_create.get_or_insert_default();
1287 let mut subscriptions_to_drop = vec![];
1288 let mut dispatcher_update = vec![];
1289 let mut actor_splits = HashMap::new();
1290 for (job_id, info) in finished_snapshot_backfill_job_info {
1291 finished_snapshot_backfill_jobs.insert(job_id);
1292 subscriptions_to_drop.extend(
1293 info.snapshot_backfill_upstream_tables.iter().map(
1294 |upstream_table_id| PbSubscriptionUpstreamInfo {
1295 subscriber_id: job_id.as_subscriber_id(),
1296 upstream_mv_table_id: *upstream_table_id,
1297 },
1298 ),
1299 );
1300 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
1301 assert_matches!(
1302 self.database_info.unregister_subscriber(
1303 upstream_mv_table_id.as_job_id(),
1304 job_id.as_subscriber_id()
1305 ),
1306 Some(SubscriberType::SnapshotBackfill)
1307 );
1308 }
1309
1310 table_ids_to_commit.extend(
1311 info.fragment_infos
1312 .values()
1313 .flat_map(|fragment| fragment.state_table_ids.iter())
1314 .copied(),
1315 );
1316
1317 let actor_len = info
1318 .fragment_infos
1319 .values()
1320 .map(|fragment| fragment.actors.len() as u64)
1321 .sum();
1322 let id_gen = GlobalActorIdGen::new(
1323 partial_graph_manager
1324 .control_stream_manager()
1325 .env
1326 .actor_id_generator(),
1327 actor_len,
1328 );
1329 let mut next_local_actor_id = 0;
1330 let actor_mapping: HashMap<_, _> = info
1332 .fragment_infos
1333 .values()
1334 .flat_map(|fragment| fragment.actors.keys())
1335 .map(|old_actor_id| {
1336 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
1337 next_local_actor_id += 1;
1338 (*old_actor_id, new_actor_id.as_global_id())
1339 })
1340 .collect();
1341 let actor_mapping = &actor_mapping;
1342 let new_stream_actors: HashMap<_, _> = info
1343 .stream_actors
1344 .into_iter()
1345 .map(|(old_actor_id, mut actor)| {
1346 let new_actor_id = actor_mapping[&old_actor_id];
1347 actor.actor_id = new_actor_id;
1348 (new_actor_id, actor)
1349 })
1350 .collect();
1351 let new_fragment_info: HashMap<_, _> = info
1352 .fragment_infos
1353 .into_iter()
1354 .map(|(fragment_id, mut fragment)| {
1355 let actors = take(&mut fragment.actors);
1356 fragment.actors = actors
1357 .into_iter()
1358 .map(|(old_actor_id, actor)| {
1359 let new_actor_id = actor_mapping[&old_actor_id];
1360 (new_actor_id, actor)
1361 })
1362 .collect();
1363 (fragment_id, fragment)
1364 })
1365 .collect();
1366 actor_splits.extend(
1367 new_fragment_info
1368 .values()
1369 .flat_map(|fragment| &fragment.actors)
1370 .map(|(actor_id, actor)| {
1371 (
1372 *actor_id,
1373 ConnectorSplits {
1374 splits: actor
1375 .splits
1376 .iter()
1377 .map(ConnectorSplit::from)
1378 .collect(),
1379 },
1380 )
1381 }),
1382 );
1383 let partial_graph_id = to_partial_graph_id(self.database_id, None);
1385 let mut edge_builder = FragmentEdgeBuilder::new(
1386 info.upstream_fragment_downstreams
1387 .keys()
1388 .map(|upstream_fragment_id| {
1389 self.database_info.fragment(*upstream_fragment_id)
1390 })
1391 .chain(new_fragment_info.values())
1392 .map(|fragment| {
1393 (
1394 fragment.fragment_id,
1395 EdgeBuilderFragmentInfo::from_inflight(
1396 fragment,
1397 partial_graph_id,
1398 partial_graph_manager.control_stream_manager(),
1399 ),
1400 )
1401 }),
1402 );
1403 edge_builder.add_relations(&info.upstream_fragment_downstreams);
1404 edge_builder.add_relations(&info.downstreams);
1405 let mut edges = edge_builder.build();
1406 let new_actors_to_create = edges.collect_actors_to_create(
1407 new_fragment_info.values().map(|fragment| {
1408 (
1409 fragment.fragment_id,
1410 &fragment.nodes,
1411 fragment.actors.iter().map(|(actor_id, actor)| {
1412 (&new_stream_actors[actor_id], actor.worker_id)
1413 }),
1414 [], )
1416 }),
1417 );
1418 dispatcher_update.extend(
1419 info.upstream_fragment_downstreams.keys().flat_map(
1420 |upstream_fragment_id| {
1421 let new_actor_dispatchers = edges
1422 .dispatchers
1423 .remove(upstream_fragment_id)
1424 .expect("should exist");
1425 new_actor_dispatchers.into_iter().flat_map(
1426 |(upstream_actor_id, dispatchers)| {
1427 dispatchers.into_iter().map(move |dispatcher| {
1428 PbDispatcherUpdate {
1429 actor_id: upstream_actor_id,
1430 dispatcher_id: dispatcher.dispatcher_id,
1431 hash_mapping: dispatcher.hash_mapping,
1432 removed_downstream_actor_id: dispatcher
1433 .downstream_actor_id
1434 .iter()
1435 .map(|new_downstream_actor_id| {
1436 actor_mapping
1437 .iter()
1438 .find_map(
1439 |(old_actor_id, new_actor_id)| {
1440 (new_downstream_actor_id
1441 == new_actor_id)
1442 .then_some(*old_actor_id)
1443 },
1444 )
1445 .expect("should exist")
1446 })
1447 .collect(),
1448 added_downstream_actor_id: dispatcher
1449 .downstream_actor_id,
1450 }
1451 })
1452 },
1453 )
1454 },
1455 ),
1456 );
1457 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1458 for (worker_id, worker_actors) in new_actors_to_create {
1459 node_actors.entry(worker_id).or_default().extend(
1460 worker_actors.values().flat_map(|(_, actors, _)| {
1461 actors.iter().map(|(actor, _, _)| actor.actor_id)
1462 }),
1463 );
1464 actors_to_create
1465 .entry(worker_id)
1466 .or_default()
1467 .extend(worker_actors);
1468 }
1469 self.database_info.add_existing(InflightStreamingJobInfo {
1470 job_id,
1471 fragment_infos: new_fragment_info,
1472 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
1474 cdc_table_backfill_tracker: None, });
1476 }
1477
1478 Some(PbMutation::Update(PbUpdateMutation {
1479 dispatcher_update,
1480 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
1484 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
1488 }))
1489 } else {
1490 let fragment_ids = self.database_info.take_pending_backfill_nodes();
1491 if fragment_ids.is_empty() {
1492 None
1493 } else {
1494 Some(PbMutation::StartFragmentBackfill(
1495 PbStartFragmentBackfillMutation { fragment_ids },
1496 ))
1497 }
1498 }
1499 }
1500 };
1501
1502 for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
1504 if !finished_snapshot_backfill_jobs.contains(job_id) {
1505 let throttle_mutation = if let Some((ref jobs, ref config)) =
1506 throttle_for_creating_jobs
1507 && jobs.contains(job_id)
1508 {
1509 assert_eq!(
1510 jobs.len(),
1511 1,
1512 "should not alter rate limit of snapshot backfill job with other jobs"
1513 );
1514 Some((
1515 Mutation::Throttle(ThrottleMutation {
1516 fragment_throttle: config
1517 .iter()
1518 .map(|(fragment_id, config)| (*fragment_id, *config))
1519 .collect(),
1520 }),
1521 take(notifiers),
1522 ))
1523 } else {
1524 None
1525 };
1526 creating_job.on_new_upstream_barrier(
1527 partial_graph_manager,
1528 &barrier_info,
1529 throttle_mutation,
1530 )?;
1531 }
1532 }
1533
1534 partial_graph_manager.inject_barrier(
1535 to_partial_graph_id(self.database_id, None),
1536 mutation,
1537 &node_actors,
1538 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1539 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1540 actors_to_create,
1541 PartialGraphBarrierInfo::new(
1542 post_collect_command,
1543 barrier_info,
1544 take(notifiers),
1545 table_ids_to_commit,
1546 ),
1547 )?;
1548
1549 Ok(ApplyCommandInfo {
1550 jobs_to_wait: finished_snapshot_backfill_jobs,
1551 })
1552 }
1553}