1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::sync::atomic::AtomicU32;
20
21use risingwave_common::bail;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::TableId;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
33use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
34use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
35use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
36use risingwave_pb::stream_plan::{
37 PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
38 PbUpstreamSinkInfo, ThrottleMutation,
39};
40use tracing::warn;
41
42use crate::MetaResult;
43use crate::barrier::cdc_progress::CdcTableBackfillTracker;
44use crate::barrier::checkpoint::{
45 CreatingStreamingJobControl, DatabaseCheckpointControl, IndependentCheckpointJobControl,
46};
47use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
48use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
49use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
50use crate::barrier::info::{
51 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
52 SubscriberType,
53};
54use crate::barrier::notifier::Notifier;
55use crate::barrier::partial_graph::{PartialGraphBarrierInfo, PartialGraphManager};
56use crate::barrier::rpc::to_partial_graph_id;
57use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
58use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
59use crate::controller::scale::{
60 ComponentFragmentAligner, EnsembleActorTemplate, NoShuffleEnsemble,
61 build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
62};
63use crate::model::{
64 ActorId, ActorNewNoShuffle, FragmentDownstreamRelation, FragmentId, StreamActor, StreamContext,
65 StreamJobActorsToCreate, StreamJobFragmentsToCreate,
66};
67use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
68use crate::stream::{
69 GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
70 fill_snapshot_backfill_epoch,
71};
72
73pub(in crate::barrier) struct BarrierWorkerState {
75 in_flight_prev_epoch: TracedEpoch,
80
81 pending_non_checkpoint_barriers: Vec<u64>,
83
84 is_paused: bool,
86}
87
88impl BarrierWorkerState {
89 pub(super) fn new() -> Self {
90 Self {
91 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
92 pending_non_checkpoint_barriers: vec![],
93 is_paused: false,
94 }
95 }
96
97 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
98 Self {
99 in_flight_prev_epoch,
100 pending_non_checkpoint_barriers: vec![],
101 is_paused,
102 }
103 }
104
105 pub fn is_paused(&self) -> bool {
106 self.is_paused
107 }
108
109 fn set_is_paused(&mut self, is_paused: bool) {
110 if self.is_paused != is_paused {
111 tracing::info!(
112 currently_paused = self.is_paused,
113 newly_paused = is_paused,
114 "update paused state"
115 );
116 self.is_paused = is_paused;
117 }
118 }
119
120 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
121 &self.in_flight_prev_epoch
122 }
123
124 pub fn next_barrier_info(
126 &mut self,
127 is_checkpoint: bool,
128 curr_epoch: TracedEpoch,
129 ) -> BarrierInfo {
130 assert!(
131 self.in_flight_prev_epoch.value() < curr_epoch.value(),
132 "curr epoch regress. {} > {}",
133 self.in_flight_prev_epoch.value(),
134 curr_epoch.value()
135 );
136 let prev_epoch = self.in_flight_prev_epoch.clone();
137 self.in_flight_prev_epoch = curr_epoch.clone();
138 self.pending_non_checkpoint_barriers
139 .push(prev_epoch.value().0);
140 let kind = if is_checkpoint {
141 let epochs = take(&mut self.pending_non_checkpoint_barriers);
142 BarrierKind::Checkpoint(epochs)
143 } else {
144 BarrierKind::Barrier
145 };
146 BarrierInfo {
147 prev_epoch,
148 curr_epoch,
149 kind,
150 }
151 }
152}
153
154pub(super) struct ApplyCommandInfo {
155 pub jobs_to_wait: HashSet<JobId>,
156}
157
158type ApplyCommandResult = (
161 Option<Mutation>,
162 HashSet<TableId>,
163 Option<StreamJobActorsToCreate>,
164 HashMap<WorkerId, HashSet<ActorId>>,
165 PostCollectCommand,
166);
167
168pub(crate) struct RenderResult {
170 pub stream_actors: HashMap<FragmentId, Vec<StreamActor>>,
172 pub actor_location: HashMap<ActorId, WorkerId>,
174}
175
176fn resolve_no_shuffle_ensembles(
185 fragments: &StreamJobFragmentsToCreate,
186 upstream_fragment_downstreams: &FragmentDownstreamRelation,
187) -> MetaResult<Vec<NoShuffleEnsemble>> {
188 let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
190
191 for (upstream_fid, relations) in &fragments.downstreams {
193 for rel in relations {
194 if rel.dispatcher_type == DispatcherType::NoShuffle {
195 new_no_shuffle
196 .entry(*upstream_fid)
197 .or_default()
198 .insert(rel.downstream_fragment_id);
199 }
200 }
201 }
202
203 for (upstream_fid, relations) in upstream_fragment_downstreams {
205 for rel in relations {
206 if rel.dispatcher_type == DispatcherType::NoShuffle {
207 new_no_shuffle
208 .entry(*upstream_fid)
209 .or_default()
210 .insert(rel.downstream_fragment_id);
211 }
212 }
213 }
214
215 let mut ensembles = if new_no_shuffle.is_empty() {
216 Vec::new()
217 } else {
218 let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
220 .iter()
221 .flat_map(|(upstream_fid, downstream_fids)| {
222 downstream_fids
223 .iter()
224 .map(move |downstream_fid| (*upstream_fid, *downstream_fid))
225 })
226 .collect();
227
228 let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
229 .iter()
230 .flat_map(|(u, d)| [*u, *d])
231 .collect::<HashSet<_>>()
232 .into_iter()
233 .collect();
234
235 let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
236 find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
237 };
238
239 let covered: HashSet<FragmentId> = ensembles
241 .iter()
242 .flat_map(|e| e.component_fragments())
243 .collect();
244 for fragment_id in fragments.inner.fragments.keys() {
245 if !covered.contains(fragment_id) {
246 ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
247 }
248 }
249
250 Ok(ensembles)
251}
252
253fn render_actors(
264 fragments: &StreamJobFragmentsToCreate,
265 database_info: &InflightDatabaseInfo,
266 definition: &str,
267 ctx: &StreamContext,
268 streaming_job_model: &streaming_job::Model,
269 actor_id_counter: &AtomicU32,
270 worker_map: &HashMap<WorkerId, WorkerNode>,
271 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
272 ensembles: &[NoShuffleEnsemble],
273 database_resource_group: &str,
274) -> MetaResult<RenderResult> {
275 let mut actor_assignments: HashMap<FragmentId, HashMap<ActorId, (WorkerId, Option<Bitmap>)>> =
278 HashMap::new();
279
280 for ensemble in ensembles {
281 let existing_fragment_ids: Vec<FragmentId> = ensemble
286 .component_fragments()
287 .filter(|fragment_id| !fragments.inner.fragments.contains_key(fragment_id))
288 .collect();
289
290 let actor_template = if let Some(&first_existing) = existing_fragment_ids.first() {
291 let template = EnsembleActorTemplate::from_existing_inflight_fragment(
292 database_info.fragment(first_existing),
293 );
294
295 for &other_fragment_id in &existing_fragment_ids[1..] {
298 let other = EnsembleActorTemplate::from_existing_inflight_fragment(
299 database_info.fragment(other_fragment_id),
300 );
301 template.assert_aligned_with(&other, first_existing, other_fragment_id);
302 }
303
304 template
305 } else {
306 let first_component = ensemble
308 .component_fragments()
309 .next()
310 .expect("ensemble must have at least one component");
311 let fragment = &fragments.inner.fragments[&first_component];
312 let distribution_type: DistributionType = fragment.distribution_type.into();
313 let vnode_count = fragment.vnode_count();
314
315 for fragment_id in ensemble.component_fragments() {
317 let f = &fragments.inner.fragments[&fragment_id];
318 assert_eq!(
319 vnode_count,
320 f.vnode_count(),
321 "component fragments {} and {} in the same no-shuffle ensemble have \
322 different vnode counts: {} vs {}",
323 first_component,
324 fragment_id,
325 vnode_count,
326 f.vnode_count(),
327 );
328 }
329
330 EnsembleActorTemplate::render_new(
331 streaming_job_model,
332 worker_map,
333 adaptive_parallelism_strategy,
334 None,
335 database_resource_group.to_owned(),
336 distribution_type,
337 vnode_count,
338 )?
339 };
340
341 for fragment_id in ensemble.component_fragments() {
343 if !fragments.inner.fragments.contains_key(&fragment_id) {
344 continue; }
346 let fragment = &fragments.inner.fragments[&fragment_id];
347 let distribution_type: DistributionType = fragment.distribution_type.into();
348 let aligner =
349 ComponentFragmentAligner::new_persistent(&actor_template, actor_id_counter);
350 let assignments = aligner.align_component_actor(distribution_type);
351 actor_assignments.insert(fragment_id, assignments);
352 }
353 }
354
355 let mut result_stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
357 let mut result_actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
358
359 for (fragment_id, assignments) in &actor_assignments {
360 let mut actors = Vec::with_capacity(assignments.len());
361 for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
362 result_actor_location.insert(actor_id, *worker_id);
363 actors.push(StreamActor {
364 actor_id,
365 fragment_id: *fragment_id,
366 vnode_bitmap: vnode_bitmap.clone(),
367 mview_definition: definition.to_owned(),
368 expr_context: Some(ctx.to_expr_context()),
369 config_override: ctx.config_override.clone(),
370 });
371 }
372 result_stream_actors.insert(*fragment_id, actors);
373 }
374
375 Ok(RenderResult {
376 stream_actors: result_stream_actors,
377 actor_location: result_actor_location,
378 })
379}
380impl DatabaseCheckpointControl {
381 fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
383 let table_ids_to_commit = self.database_info.existing_table_ids().collect();
384 let node_actors =
385 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
386 (table_ids_to_commit, node_actors)
387 }
388
389 fn apply_simple_command(
393 &self,
394 mutation: Option<Mutation>,
395 command_name: &'static str,
396 ) -> ApplyCommandResult {
397 let (table_ids, node_actors) = self.collect_base_info();
398 (
399 mutation,
400 table_ids,
401 None,
402 node_actors,
403 PostCollectCommand::Command(command_name.to_owned()),
404 )
405 }
406
407 pub(super) fn apply_command(
410 &mut self,
411 command: Option<Command>,
412 notifiers: &mut Vec<Notifier>,
413 barrier_info: BarrierInfo,
414 partial_graph_manager: &mut PartialGraphManager,
415 hummock_version_stats: &HummockVersionStats,
416 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
417 worker_nodes: &HashMap<WorkerId, WorkerNode>,
418 ) -> MetaResult<ApplyCommandInfo> {
419 debug_assert!(
420 !matches!(
421 command,
422 Some(Command::RescheduleIntent {
423 reschedule_plan: None,
424 ..
425 })
426 ),
427 "reschedule intent must be resolved before apply"
428 );
429 if matches!(
430 command,
431 Some(Command::RescheduleIntent {
432 reschedule_plan: None,
433 ..
434 })
435 ) {
436 bail!("reschedule intent must be resolved before apply");
437 }
438
439 fn resolve_source_splits(
444 info: &CreateStreamingJobCommandInfo,
445 render_result: &RenderResult,
446 actor_no_shuffle: &ActorNewNoShuffle,
447 database_info: &InflightDatabaseInfo,
448 ) -> MetaResult<SplitAssignment> {
449 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
450 .stream_actors
451 .iter()
452 .map(|(fragment_id, actors)| {
453 (
454 *fragment_id,
455 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
456 )
457 })
458 .collect();
459 let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
460 &info.stream_job_fragments,
461 &info.init_split_assignment,
462 &fragment_actor_ids,
463 )?;
464 resolved.extend(SourceManager::resolve_backfill_splits(
465 &info.stream_job_fragments,
466 actor_no_shuffle,
467 |fragment_id, actor_id| {
468 database_info
469 .fragment(fragment_id)
470 .actors
471 .get(&actor_id)
472 .map(|info| info.splits.clone())
473 },
474 )?);
475 Ok(resolved)
476 }
477
478 let mut throttle_for_creating_jobs: Option<(
480 HashSet<JobId>,
481 HashMap<FragmentId, ThrottleConfig>,
482 )> = None;
483
484 let (
488 mutation,
489 mut table_ids_to_commit,
490 mut actors_to_create,
491 mut node_actors,
492 post_collect_command,
493 ) = match command {
494 None => self.apply_simple_command(None, "barrier"),
495 Some(Command::CreateStreamingJob {
496 mut info,
497 job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
498 cross_db_snapshot_backfill_info,
499 }) => {
500 let ensembles = resolve_no_shuffle_ensembles(
501 &info.stream_job_fragments,
502 &info.upstream_fragment_downstreams,
503 )?;
504 let actors = render_actors(
505 &info.stream_job_fragments,
506 &self.database_info,
507 &info.definition,
508 &info.stream_job_fragments.inner.ctx,
509 &info.streaming_job_model,
510 partial_graph_manager
511 .control_stream_manager()
512 .env
513 .actor_id_generator(),
514 worker_nodes,
515 adaptive_parallelism_strategy,
516 &ensembles,
517 &info.database_resource_group,
518 )?;
519 {
520 assert!(!self.state.is_paused());
521 let snapshot_epoch = barrier_info.prev_epoch();
522 for snapshot_backfill_epoch in snapshot_backfill_info
524 .upstream_mv_table_id_to_backfill_epoch
525 .values_mut()
526 {
527 assert_eq!(
528 snapshot_backfill_epoch.replace(snapshot_epoch),
529 None,
530 "must not set previously"
531 );
532 }
533 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
534 fill_snapshot_backfill_epoch(
535 &mut fragment.nodes,
536 Some(&snapshot_backfill_info),
537 &cross_db_snapshot_backfill_info,
538 )?;
539 }
540 let job_id = info.stream_job_fragments.stream_job_id();
541 let snapshot_backfill_upstream_tables = snapshot_backfill_info
542 .upstream_mv_table_id_to_backfill_epoch
543 .keys()
544 .cloned()
545 .collect();
546
547 let mut edges = self.database_info.build_edge(
549 Some((&info, true)),
550 None,
551 None,
552 partial_graph_manager.control_stream_manager(),
553 &actors.stream_actors,
554 &actors.actor_location,
555 );
556 let resolved_split_assignment = resolve_source_splits(
558 &info,
559 &actors,
560 edges.actor_new_no_shuffle(),
561 &self.database_info,
562 )?;
563
564 let Entry::Vacant(entry) =
565 self.independent_checkpoint_job_controls.entry(job_id)
566 else {
567 panic!("duplicated creating snapshot backfill job {job_id}");
568 };
569
570 let job = CreatingStreamingJobControl::new(
571 entry,
572 CreateSnapshotBackfillJobCommandInfo {
573 info: info.clone(),
574 snapshot_backfill_info: snapshot_backfill_info.clone(),
575 cross_db_snapshot_backfill_info,
576 resolved_split_assignment: resolved_split_assignment.clone(),
577 },
578 take(notifiers),
579 snapshot_backfill_upstream_tables,
580 snapshot_epoch,
581 hummock_version_stats,
582 partial_graph_manager,
583 &mut edges,
584 &resolved_split_assignment,
585 &actors,
586 )?;
587
588 if let Some(fragment_infos) = job.fragment_infos() {
589 self.database_info.shared_actor_infos.upsert(
590 self.database_id,
591 fragment_infos.values().map(|f| (f, job_id)),
592 );
593 }
594
595 for upstream_mv_table_id in snapshot_backfill_info
596 .upstream_mv_table_id_to_backfill_epoch
597 .keys()
598 {
599 self.database_info.register_subscriber(
600 upstream_mv_table_id.as_job_id(),
601 info.streaming_job.id().as_subscriber_id(),
602 SubscriberType::SnapshotBackfill,
603 );
604 }
605
606 let mutation = Command::create_streaming_job_to_mutation(
607 &info,
608 &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
609 self.state.is_paused(),
610 &mut edges,
611 partial_graph_manager.control_stream_manager(),
612 None,
613 &resolved_split_assignment,
614 &actors.stream_actors,
615 &actors.actor_location,
616 )?;
617
618 let (table_ids, node_actors) = self.collect_base_info();
619 (
620 Some(mutation),
621 table_ids,
622 None,
623 node_actors,
624 PostCollectCommand::barrier(),
625 )
626 }
627 }
628 Some(Command::CreateStreamingJob {
629 mut info,
630 job_type,
631 cross_db_snapshot_backfill_info,
632 }) => {
633 let ensembles = resolve_no_shuffle_ensembles(
634 &info.stream_job_fragments,
635 &info.upstream_fragment_downstreams,
636 )?;
637 let actors = render_actors(
638 &info.stream_job_fragments,
639 &self.database_info,
640 &info.definition,
641 &info.stream_job_fragments.inner.ctx,
642 &info.streaming_job_model,
643 partial_graph_manager
644 .control_stream_manager()
645 .env
646 .actor_id_generator(),
647 worker_nodes,
648 adaptive_parallelism_strategy,
649 &ensembles,
650 &info.database_resource_group,
651 )?;
652 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
653 fill_snapshot_backfill_epoch(
654 &mut fragment.nodes,
655 None,
656 &cross_db_snapshot_backfill_info,
657 )?;
658 }
659
660 let new_upstream_sink =
662 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
663 Some(ctx)
664 } else {
665 None
666 };
667
668 let mut edges = self.database_info.build_edge(
669 Some((&info, false)),
670 None,
671 new_upstream_sink,
672 partial_graph_manager.control_stream_manager(),
673 &actors.stream_actors,
674 &actors.actor_location,
675 );
676 let resolved_split_assignment = resolve_source_splits(
678 &info,
679 &actors,
680 edges.actor_new_no_shuffle(),
681 &self.database_info,
682 )?;
683
684 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
686 let (fragment, _) =
687 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
688 .expect("should have parallel cdc fragment");
689 Some(CdcTableBackfillTracker::new(
690 fragment.fragment_id,
691 splits.clone(),
692 ))
693 } else {
694 None
695 };
696 self.database_info
697 .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
698 self.database_info.pre_apply_new_fragments(
699 info.stream_job_fragments
700 .new_fragment_info(
701 &actors.stream_actors,
702 &actors.actor_location,
703 &resolved_split_assignment,
704 )
705 .map(|(fragment_id, fragment_infos)| {
706 (fragment_id, info.streaming_job.id(), fragment_infos)
707 }),
708 );
709 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
710 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
711 self.database_info.pre_apply_add_node_upstream(
712 downstream_fragment_id,
713 &PbUpstreamSinkInfo {
714 upstream_fragment_id: ctx.sink_fragment_id,
715 sink_output_schema: ctx.sink_output_fields.clone(),
716 project_exprs: ctx.project_exprs.clone(),
717 },
718 );
719 }
720
721 let (table_ids, node_actors) = self.collect_base_info();
722
723 let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
725 &info,
726 &mut edges,
727 &actors.stream_actors,
728 &actors.actor_location,
729 ));
730
731 let actor_cdc_table_snapshot_splits = self
733 .database_info
734 .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
735
736 let is_currently_paused = self.state.is_paused();
738 let mutation = Command::create_streaming_job_to_mutation(
739 &info,
740 &job_type,
741 is_currently_paused,
742 &mut edges,
743 partial_graph_manager.control_stream_manager(),
744 actor_cdc_table_snapshot_splits,
745 &resolved_split_assignment,
746 &actors.stream_actors,
747 &actors.actor_location,
748 )?;
749
750 (
751 Some(mutation),
752 table_ids,
753 actors_to_create,
754 node_actors,
755 PostCollectCommand::CreateStreamingJob {
756 info,
757 job_type,
758 cross_db_snapshot_backfill_info,
759 resolved_split_assignment,
760 },
761 )
762 }
763
764 Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
765
766 Some(Command::Pause) => {
767 let prev_is_paused = self.state.is_paused();
768 self.state.set_is_paused(true);
769 let mutation = Command::pause_to_mutation(prev_is_paused);
770 let (table_ids, node_actors) = self.collect_base_info();
771 (
772 mutation,
773 table_ids,
774 None,
775 node_actors,
776 PostCollectCommand::Command("Pause".to_owned()),
777 )
778 }
779
780 Some(Command::Resume) => {
781 let prev_is_paused = self.state.is_paused();
782 self.state.set_is_paused(false);
783 let mutation = Command::resume_to_mutation(prev_is_paused);
784 let (table_ids, node_actors) = self.collect_base_info();
785 (
786 mutation,
787 table_ids,
788 None,
789 node_actors,
790 PostCollectCommand::Command("Resume".to_owned()),
791 )
792 }
793
794 Some(Command::Throttle { jobs, config }) => {
795 let mutation = Some(Command::throttle_to_mutation(&config));
796 throttle_for_creating_jobs = Some((jobs, config));
797 self.apply_simple_command(mutation, "Throttle")
798 }
799
800 Some(Command::DropStreamingJobs {
801 streaming_job_ids,
802 unregistered_fragment_ids,
803 dropped_sink_fragment_by_targets,
804 ..
805 }) => {
806 let actors = self
807 .database_info
808 .fragment_infos()
809 .filter(|fragment| {
810 self.database_info
811 .job_id_by_fragment(fragment.fragment_id)
812 .is_some_and(|job_id| streaming_job_ids.contains(&job_id))
813 })
814 .flat_map(|fragment| fragment.actors.keys().copied())
815 .collect::<Vec<_>>();
816
817 for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
819 self.database_info
820 .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
821 }
822
823 let (table_ids, node_actors) = self.collect_base_info();
824
825 self.database_info
827 .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
828
829 let mutation = Some(Command::drop_streaming_jobs_to_mutation(
830 &actors,
831 &dropped_sink_fragment_by_targets,
832 ));
833 (
834 mutation,
835 table_ids,
836 None,
837 node_actors,
838 PostCollectCommand::DropStreamingJobs,
839 )
840 }
841
842 Some(Command::RescheduleIntent {
843 reschedule_plan, ..
844 }) => {
845 let ReschedulePlan {
846 reschedules,
847 fragment_actors,
848 } = reschedule_plan
849 .as_ref()
850 .expect("reschedule intent should be resolved in global barrier worker");
851
852 for (fragment_id, reschedule) in reschedules {
854 self.database_info.pre_apply_reschedule(
855 *fragment_id,
856 reschedule
857 .added_actors
858 .iter()
859 .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
860 actors.iter().map(|actor_id| {
861 (
862 *actor_id,
863 InflightActorInfo {
864 worker_id: *node_id,
865 vnode_bitmap: reschedule
866 .newly_created_actors
867 .get(actor_id)
868 .expect("should exist")
869 .0
870 .0
871 .vnode_bitmap
872 .clone(),
873 splits: reschedule
874 .actor_splits
875 .get(actor_id)
876 .cloned()
877 .unwrap_or_default(),
878 },
879 )
880 })
881 })
882 .collect(),
883 reschedule
884 .vnode_bitmap_updates
885 .iter()
886 .filter(|(actor_id, _)| {
887 !reschedule.newly_created_actors.contains_key(*actor_id)
888 })
889 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
890 .collect(),
891 reschedule.actor_splits.clone(),
892 );
893 }
894
895 let (table_ids, node_actors) = self.collect_base_info();
896
897 let actors_to_create = Some(Command::reschedule_actors_to_create(
899 reschedules,
900 fragment_actors,
901 &self.database_info,
902 partial_graph_manager.control_stream_manager(),
903 ));
904
905 self.database_info
907 .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
908 (
909 *fragment_id,
910 reschedule.removed_actors.iter().cloned().collect(),
911 )
912 }));
913
914 let mutation = Command::reschedule_to_mutation(
916 reschedules,
917 fragment_actors,
918 partial_graph_manager.control_stream_manager(),
919 &mut self.database_info,
920 )?;
921
922 let reschedules = reschedule_plan
923 .expect("reschedule intent should be resolved in global barrier worker")
924 .reschedules;
925 (
926 mutation,
927 table_ids,
928 actors_to_create,
929 node_actors,
930 PostCollectCommand::Reschedule { reschedules },
931 )
932 }
933
934 Some(Command::ReplaceStreamJob(plan)) => {
935 let ensembles = resolve_no_shuffle_ensembles(
936 &plan.new_fragments,
937 &plan.upstream_fragment_downstreams,
938 )?;
939 let mut render_result = render_actors(
940 &plan.new_fragments,
941 &self.database_info,
942 "", &plan.new_fragments.inner.ctx,
944 &plan.streaming_job_model,
945 partial_graph_manager
946 .control_stream_manager()
947 .env
948 .actor_id_generator(),
949 worker_nodes,
950 adaptive_parallelism_strategy,
951 &ensembles,
952 &plan.database_resource_group,
953 )?;
954
955 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
958 let actor_id_counter = partial_graph_manager
959 .control_stream_manager()
960 .env
961 .actor_id_generator();
962 for sink_ctx in sinks {
963 let original_fragment_id = sink_ctx.original_fragment.fragment_id;
964 let original_frag_info = self.database_info.fragment(original_fragment_id);
965 let actor_template = EnsembleActorTemplate::from_existing_inflight_fragment(
966 original_frag_info,
967 );
968 let new_aligner = ComponentFragmentAligner::new_persistent(
969 &actor_template,
970 actor_id_counter,
971 );
972 let distribution_type: DistributionType =
973 sink_ctx.new_fragment.distribution_type.into();
974 let actor_assignments =
975 new_aligner.align_component_actor(distribution_type);
976 let new_fragment_id = sink_ctx.new_fragment.fragment_id;
977 let mut actors = Vec::with_capacity(actor_assignments.len());
978 for (&actor_id, (worker_id, vnode_bitmap)) in &actor_assignments {
979 render_result.actor_location.insert(actor_id, *worker_id);
980 actors.push(StreamActor {
981 actor_id,
982 fragment_id: new_fragment_id,
983 vnode_bitmap: vnode_bitmap.clone(),
984 mview_definition: String::new(),
985 expr_context: Some(sink_ctx.ctx.to_expr_context()),
986 config_override: sink_ctx.ctx.config_override.clone(),
987 });
988 }
989 render_result.stream_actors.insert(new_fragment_id, actors);
990 }
991 }
992
993 let mut edges = self.database_info.build_edge(
995 None,
996 Some(&plan),
997 None,
998 partial_graph_manager.control_stream_manager(),
999 &render_result.stream_actors,
1000 &render_result.actor_location,
1001 );
1002
1003 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
1005 .stream_actors
1006 .iter()
1007 .map(|(fragment_id, actors)| {
1008 (
1009 *fragment_id,
1010 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
1011 )
1012 })
1013 .collect();
1014 let resolved_split_assignment = match &plan.split_plan {
1015 ReplaceJobSplitPlan::Discovered(discovered) => {
1016 SourceManager::resolve_fragment_to_actor_splits(
1017 &plan.new_fragments,
1018 discovered,
1019 &fragment_actor_ids,
1020 )?
1021 }
1022 ReplaceJobSplitPlan::AlignFromPrevious => {
1023 SourceManager::resolve_replace_source_splits(
1024 &plan.new_fragments,
1025 &plan.replace_upstream,
1026 edges.actor_new_no_shuffle(),
1027 |_fragment_id, actor_id| {
1028 self.database_info.fragment_infos().find_map(|fragment| {
1029 fragment
1030 .actors
1031 .get(&actor_id)
1032 .map(|info| info.splits.clone())
1033 })
1034 },
1035 )?
1036 }
1037 };
1038
1039 self.database_info.pre_apply_new_fragments(
1041 plan.new_fragments
1042 .new_fragment_info(
1043 &render_result.stream_actors,
1044 &render_result.actor_location,
1045 &resolved_split_assignment,
1046 )
1047 .map(|(fragment_id, new_fragment)| {
1048 (fragment_id, plan.streaming_job.id(), new_fragment)
1049 }),
1050 );
1051 for (fragment_id, replace_map) in &plan.replace_upstream {
1052 self.database_info
1053 .pre_apply_replace_node_upstream(*fragment_id, replace_map);
1054 }
1055 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1056 self.database_info
1057 .pre_apply_new_fragments(sinks.iter().map(|sink| {
1058 (
1059 sink.new_fragment.fragment_id,
1060 sink.original_sink.id.as_job_id(),
1061 sink.new_fragment_info(
1062 &render_result.stream_actors,
1063 &render_result.actor_location,
1064 ),
1065 )
1066 }));
1067 }
1068
1069 let (table_ids, node_actors) = self.collect_base_info();
1070
1071 let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
1073 &plan,
1074 &mut edges,
1075 &self.database_info,
1076 &render_result.stream_actors,
1077 &render_result.actor_location,
1078 ));
1079
1080 let mutation = Command::replace_stream_job_to_mutation(
1083 &plan,
1084 &mut edges,
1085 &mut self.database_info,
1086 &resolved_split_assignment,
1087 )?;
1088
1089 {
1091 let mut fragment_ids_to_remove: Vec<_> = plan
1092 .old_fragments
1093 .fragments
1094 .values()
1095 .map(|f| f.fragment_id)
1096 .collect();
1097 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1098 fragment_ids_to_remove
1099 .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
1100 }
1101 self.database_info
1102 .post_apply_remove_fragments(fragment_ids_to_remove);
1103 }
1104
1105 (
1106 mutation,
1107 table_ids,
1108 actors_to_create,
1109 node_actors,
1110 PostCollectCommand::ReplaceStreamJob {
1111 plan,
1112 resolved_split_assignment,
1113 },
1114 )
1115 }
1116
1117 Some(Command::SourceChangeSplit(split_state)) => {
1118 self.database_info.pre_apply_split_assignments(
1120 split_state
1121 .split_assignment
1122 .iter()
1123 .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
1124 );
1125
1126 let mutation = Some(Command::source_change_split_to_mutation(
1127 &split_state.split_assignment,
1128 ));
1129 let (table_ids, node_actors) = self.collect_base_info();
1130 (
1131 mutation,
1132 table_ids,
1133 None,
1134 node_actors,
1135 PostCollectCommand::SourceChangeSplit {
1136 split_assignment: split_state.split_assignment,
1137 },
1138 )
1139 }
1140
1141 Some(Command::CreateSubscription {
1142 subscription_id,
1143 upstream_mv_table_id,
1144 retention_second,
1145 }) => {
1146 self.database_info.register_subscriber(
1147 upstream_mv_table_id.as_job_id(),
1148 subscription_id.as_subscriber_id(),
1149 SubscriberType::Subscription(retention_second),
1150 );
1151 let mutation = Some(Command::create_subscription_to_mutation(
1152 upstream_mv_table_id,
1153 subscription_id,
1154 ));
1155 let (table_ids, node_actors) = self.collect_base_info();
1156 (
1157 mutation,
1158 table_ids,
1159 None,
1160 node_actors,
1161 PostCollectCommand::CreateSubscription { subscription_id },
1162 )
1163 }
1164
1165 Some(Command::DropSubscription {
1166 subscription_id,
1167 upstream_mv_table_id,
1168 }) => {
1169 if self
1170 .database_info
1171 .unregister_subscriber(
1172 upstream_mv_table_id.as_job_id(),
1173 subscription_id.as_subscriber_id(),
1174 )
1175 .is_none()
1176 {
1177 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
1178 }
1179 let mutation = Some(Command::drop_subscription_to_mutation(
1180 upstream_mv_table_id,
1181 subscription_id,
1182 ));
1183 let (table_ids, node_actors) = self.collect_base_info();
1184 (
1185 mutation,
1186 table_ids,
1187 None,
1188 node_actors,
1189 PostCollectCommand::Command("DropSubscription".to_owned()),
1190 )
1191 }
1192
1193 Some(Command::AlterSubscriptionRetention {
1194 subscription_id,
1195 upstream_mv_table_id,
1196 retention_second,
1197 }) => {
1198 self.database_info.update_subscription_retention(
1199 upstream_mv_table_id.as_job_id(),
1200 subscription_id.as_subscriber_id(),
1201 retention_second,
1202 );
1203 self.apply_simple_command(None, "AlterSubscriptionRetention")
1204 }
1205
1206 Some(Command::ConnectorPropsChange(config)) => {
1207 let mutation = Some(Command::connector_props_change_to_mutation(&config));
1208 let (table_ids, node_actors) = self.collect_base_info();
1209 (
1210 mutation,
1211 table_ids,
1212 None,
1213 node_actors,
1214 PostCollectCommand::ConnectorPropsChange(config),
1215 )
1216 }
1217
1218 Some(Command::Refresh {
1219 table_id,
1220 associated_source_id,
1221 }) => {
1222 let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
1223 self.apply_simple_command(mutation, "Refresh")
1224 }
1225
1226 Some(Command::ListFinish {
1227 table_id: _,
1228 associated_source_id,
1229 }) => {
1230 let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
1231 self.apply_simple_command(mutation, "ListFinish")
1232 }
1233
1234 Some(Command::LoadFinish {
1235 table_id: _,
1236 associated_source_id,
1237 }) => {
1238 let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
1239 self.apply_simple_command(mutation, "LoadFinish")
1240 }
1241
1242 Some(Command::ResetSource { source_id }) => {
1243 let mutation = Some(Command::reset_source_to_mutation(source_id));
1244 self.apply_simple_command(mutation, "ResetSource")
1245 }
1246
1247 Some(Command::ResumeBackfill { target }) => {
1248 let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
1249 let (table_ids, node_actors) = self.collect_base_info();
1250 (
1251 mutation,
1252 table_ids,
1253 None,
1254 node_actors,
1255 PostCollectCommand::ResumeBackfill { target },
1256 )
1257 }
1258
1259 Some(Command::InjectSourceOffsets {
1260 source_id,
1261 split_offsets,
1262 }) => {
1263 let mutation = Some(Command::inject_source_offsets_to_mutation(
1264 source_id,
1265 &split_offsets,
1266 ));
1267 self.apply_simple_command(mutation, "InjectSourceOffsets")
1268 }
1269 };
1270
1271 let mut finished_snapshot_backfill_jobs = HashSet::new();
1272 let mutation = match mutation {
1273 Some(mutation) => Some(mutation),
1274 None => {
1275 let mut finished_snapshot_backfill_job_info = HashMap::new();
1276 if barrier_info.kind.is_checkpoint() {
1277 for (&job_id, job) in &mut self.independent_checkpoint_job_controls {
1278 let IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) =
1279 job;
1280 if creating_job.should_merge_to_upstream() {
1281 let info = creating_job
1282 .start_consume_upstream(partial_graph_manager, &barrier_info)?;
1283 finished_snapshot_backfill_job_info
1284 .try_insert(job_id, info)
1285 .expect("non-duplicated");
1286 }
1287 }
1288 }
1289
1290 if !finished_snapshot_backfill_job_info.is_empty() {
1291 let actors_to_create = actors_to_create.get_or_insert_default();
1292 let mut subscriptions_to_drop = vec![];
1293 let mut dispatcher_update = vec![];
1294 let mut actor_splits = HashMap::new();
1295 for (job_id, info) in finished_snapshot_backfill_job_info {
1296 finished_snapshot_backfill_jobs.insert(job_id);
1297 subscriptions_to_drop.extend(
1298 info.snapshot_backfill_upstream_tables.iter().map(
1299 |upstream_table_id| PbSubscriptionUpstreamInfo {
1300 subscriber_id: job_id.as_subscriber_id(),
1301 upstream_mv_table_id: *upstream_table_id,
1302 },
1303 ),
1304 );
1305 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
1306 assert_matches!(
1307 self.database_info.unregister_subscriber(
1308 upstream_mv_table_id.as_job_id(),
1309 job_id.as_subscriber_id()
1310 ),
1311 Some(SubscriberType::SnapshotBackfill)
1312 );
1313 }
1314
1315 table_ids_to_commit.extend(
1316 info.fragment_infos
1317 .values()
1318 .flat_map(|fragment| fragment.state_table_ids.iter())
1319 .copied(),
1320 );
1321
1322 let actor_len = info
1323 .fragment_infos
1324 .values()
1325 .map(|fragment| fragment.actors.len() as u64)
1326 .sum();
1327 let id_gen = GlobalActorIdGen::new(
1328 partial_graph_manager
1329 .control_stream_manager()
1330 .env
1331 .actor_id_generator(),
1332 actor_len,
1333 );
1334 let mut next_local_actor_id = 0;
1335 let actor_mapping: HashMap<_, _> = info
1337 .fragment_infos
1338 .values()
1339 .flat_map(|fragment| fragment.actors.keys())
1340 .map(|old_actor_id| {
1341 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
1342 next_local_actor_id += 1;
1343 (*old_actor_id, new_actor_id.as_global_id())
1344 })
1345 .collect();
1346 let actor_mapping = &actor_mapping;
1347 let new_stream_actors: HashMap<_, _> = info
1348 .stream_actors
1349 .into_iter()
1350 .map(|(old_actor_id, mut actor)| {
1351 let new_actor_id = actor_mapping[&old_actor_id];
1352 actor.actor_id = new_actor_id;
1353 (new_actor_id, actor)
1354 })
1355 .collect();
1356 let new_fragment_info: HashMap<_, _> = info
1357 .fragment_infos
1358 .into_iter()
1359 .map(|(fragment_id, mut fragment)| {
1360 let actors = take(&mut fragment.actors);
1361 fragment.actors = actors
1362 .into_iter()
1363 .map(|(old_actor_id, actor)| {
1364 let new_actor_id = actor_mapping[&old_actor_id];
1365 (new_actor_id, actor)
1366 })
1367 .collect();
1368 (fragment_id, fragment)
1369 })
1370 .collect();
1371 actor_splits.extend(
1372 new_fragment_info
1373 .values()
1374 .flat_map(|fragment| &fragment.actors)
1375 .map(|(actor_id, actor)| {
1376 (
1377 *actor_id,
1378 ConnectorSplits {
1379 splits: actor
1380 .splits
1381 .iter()
1382 .map(ConnectorSplit::from)
1383 .collect(),
1384 },
1385 )
1386 }),
1387 );
1388 let partial_graph_id = to_partial_graph_id(self.database_id, None);
1390 let mut edge_builder = FragmentEdgeBuilder::new(
1391 info.upstream_fragment_downstreams
1392 .keys()
1393 .map(|upstream_fragment_id| {
1394 self.database_info.fragment(*upstream_fragment_id)
1395 })
1396 .chain(new_fragment_info.values())
1397 .map(|fragment| {
1398 (
1399 fragment.fragment_id,
1400 EdgeBuilderFragmentInfo::from_inflight(
1401 fragment,
1402 partial_graph_id,
1403 partial_graph_manager.control_stream_manager(),
1404 ),
1405 )
1406 }),
1407 );
1408 edge_builder.add_relations(&info.upstream_fragment_downstreams);
1409 edge_builder.add_relations(&info.downstreams);
1410 let mut edges = edge_builder.build();
1411 let new_actors_to_create = edges.collect_actors_to_create(
1412 new_fragment_info.values().map(|fragment| {
1413 (
1414 fragment.fragment_id,
1415 &fragment.nodes,
1416 fragment.actors.iter().map(|(actor_id, actor)| {
1417 (&new_stream_actors[actor_id], actor.worker_id)
1418 }),
1419 [], )
1421 }),
1422 );
1423 dispatcher_update.extend(
1424 info.upstream_fragment_downstreams.keys().flat_map(
1425 |upstream_fragment_id| {
1426 let new_actor_dispatchers = edges
1427 .dispatchers
1428 .remove(upstream_fragment_id)
1429 .expect("should exist");
1430 new_actor_dispatchers.into_iter().flat_map(
1431 |(upstream_actor_id, dispatchers)| {
1432 dispatchers.into_iter().map(move |dispatcher| {
1433 PbDispatcherUpdate {
1434 actor_id: upstream_actor_id,
1435 dispatcher_id: dispatcher.dispatcher_id,
1436 hash_mapping: dispatcher.hash_mapping,
1437 removed_downstream_actor_id: dispatcher
1438 .downstream_actor_id
1439 .iter()
1440 .map(|new_downstream_actor_id| {
1441 actor_mapping
1442 .iter()
1443 .find_map(
1444 |(old_actor_id, new_actor_id)| {
1445 (new_downstream_actor_id
1446 == new_actor_id)
1447 .then_some(*old_actor_id)
1448 },
1449 )
1450 .expect("should exist")
1451 })
1452 .collect(),
1453 added_downstream_actor_id: dispatcher
1454 .downstream_actor_id,
1455 }
1456 })
1457 },
1458 )
1459 },
1460 ),
1461 );
1462 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1463 for (worker_id, worker_actors) in new_actors_to_create {
1464 node_actors.entry(worker_id).or_default().extend(
1465 worker_actors.values().flat_map(|(_, actors, _)| {
1466 actors.iter().map(|(actor, _, _)| actor.actor_id)
1467 }),
1468 );
1469 actors_to_create
1470 .entry(worker_id)
1471 .or_default()
1472 .extend(worker_actors);
1473 }
1474 self.database_info.add_existing(InflightStreamingJobInfo {
1475 job_id,
1476 fragment_infos: new_fragment_info,
1477 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
1479 cdc_table_backfill_tracker: None, });
1481 }
1482
1483 Some(PbMutation::Update(PbUpdateMutation {
1484 dispatcher_update,
1485 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
1489 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
1493 }))
1494 } else {
1495 let fragment_ids = self.database_info.take_pending_backfill_nodes();
1496 if fragment_ids.is_empty() {
1497 None
1498 } else {
1499 Some(PbMutation::StartFragmentBackfill(
1500 PbStartFragmentBackfillMutation { fragment_ids },
1501 ))
1502 }
1503 }
1504 }
1505 };
1506
1507 for (job_id, job) in &mut self.independent_checkpoint_job_controls {
1509 let IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) = job;
1510 if !finished_snapshot_backfill_jobs.contains(job_id) {
1511 let throttle_mutation = if let Some((ref jobs, ref config)) =
1512 throttle_for_creating_jobs
1513 && jobs.contains(job_id)
1514 {
1515 assert_eq!(
1516 jobs.len(),
1517 1,
1518 "should not alter rate limit of snapshot backfill job with other jobs"
1519 );
1520 Some((
1521 Mutation::Throttle(ThrottleMutation {
1522 fragment_throttle: config
1523 .iter()
1524 .map(|(fragment_id, config)| (*fragment_id, *config))
1525 .collect(),
1526 }),
1527 take(notifiers),
1528 ))
1529 } else {
1530 None
1531 };
1532 creating_job.on_new_upstream_barrier(
1533 partial_graph_manager,
1534 &barrier_info,
1535 throttle_mutation,
1536 )?;
1537 }
1538 }
1539
1540 partial_graph_manager.inject_barrier(
1541 to_partial_graph_id(self.database_id, None),
1542 mutation,
1543 &node_actors,
1544 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1545 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1546 actors_to_create,
1547 PartialGraphBarrierInfo::new(
1548 post_collect_command,
1549 barrier_info,
1550 take(notifiers),
1551 table_ids_to_commit,
1552 ),
1553 )?;
1554
1555 Ok(ApplyCommandInfo {
1556 jobs_to_wait: finished_snapshot_backfill_jobs,
1557 })
1558 }
1559}