1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::sync::atomic::AtomicU32;
20
21use risingwave_common::bail;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::TableId;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
33use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
34use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
35use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
36use risingwave_pb::stream_plan::{
37 PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
38 PbUpstreamSinkInfo, ThrottleMutation,
39};
40use tracing::warn;
41
42use crate::MetaResult;
43use crate::barrier::cdc_progress::CdcTableBackfillTracker;
44use crate::barrier::checkpoint::{
45 CreatingStreamingJobControl, DatabaseCheckpointControl, IndependentCheckpointJobControl,
46};
47use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
48use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
49use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
50use crate::barrier::info::{
51 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
52 SubscriberType,
53};
54use crate::barrier::notifier::Notifier;
55use crate::barrier::partial_graph::{PartialGraphBarrierInfo, PartialGraphManager};
56use crate::barrier::rpc::to_partial_graph_id;
57use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
58use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
59use crate::controller::scale::{
60 ComponentFragmentAligner, EnsembleActorTemplate, NoShuffleEnsemble,
61 build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
62};
63use crate::model::{
64 ActorId, ActorNewNoShuffle, FragmentDownstreamRelation, FragmentId, StreamActor, StreamContext,
65 StreamJobActorsToCreate, StreamJobFragmentsToCreate,
66};
67use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
68use crate::stream::{
69 GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
70 fill_snapshot_backfill_epoch,
71};
72
73pub(in crate::barrier) struct BarrierWorkerState {
75 in_flight_prev_epoch: TracedEpoch,
80
81 pending_non_checkpoint_barriers: Vec<u64>,
83
84 is_paused: bool,
86}
87
88impl BarrierWorkerState {
89 pub(super) fn new() -> Self {
90 Self {
91 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
92 pending_non_checkpoint_barriers: vec![],
93 is_paused: false,
94 }
95 }
96
97 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
98 Self {
99 in_flight_prev_epoch,
100 pending_non_checkpoint_barriers: vec![],
101 is_paused,
102 }
103 }
104
105 pub fn is_paused(&self) -> bool {
106 self.is_paused
107 }
108
109 fn set_is_paused(&mut self, is_paused: bool) {
110 if self.is_paused != is_paused {
111 tracing::info!(
112 currently_paused = self.is_paused,
113 newly_paused = is_paused,
114 "update paused state"
115 );
116 self.is_paused = is_paused;
117 }
118 }
119
120 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
121 &self.in_flight_prev_epoch
122 }
123
124 pub fn next_barrier_info(
126 &mut self,
127 is_checkpoint: bool,
128 curr_epoch: TracedEpoch,
129 ) -> BarrierInfo {
130 assert!(
131 self.in_flight_prev_epoch.value() < curr_epoch.value(),
132 "curr epoch regress. {} > {}",
133 self.in_flight_prev_epoch.value(),
134 curr_epoch.value()
135 );
136 let prev_epoch = self.in_flight_prev_epoch.clone();
137 self.in_flight_prev_epoch = curr_epoch.clone();
138 self.pending_non_checkpoint_barriers
139 .push(prev_epoch.value().0);
140 let kind = if is_checkpoint {
141 let epochs = take(&mut self.pending_non_checkpoint_barriers);
142 BarrierKind::Checkpoint(epochs)
143 } else {
144 BarrierKind::Barrier
145 };
146 BarrierInfo {
147 prev_epoch,
148 curr_epoch,
149 kind,
150 }
151 }
152}
153
154pub(super) struct ApplyCommandInfo {
155 pub jobs_to_wait: HashSet<JobId>,
156}
157
158type ApplyCommandResult = (
161 Option<Mutation>,
162 HashSet<TableId>,
163 Option<StreamJobActorsToCreate>,
164 HashMap<WorkerId, HashSet<ActorId>>,
165 PostCollectCommand,
166);
167
168pub(crate) struct RenderResult {
170 pub stream_actors: HashMap<FragmentId, Vec<StreamActor>>,
172 pub actor_location: HashMap<ActorId, WorkerId>,
174}
175
176fn resolve_no_shuffle_ensembles(
185 fragments: &StreamJobFragmentsToCreate,
186 upstream_fragment_downstreams: &FragmentDownstreamRelation,
187) -> MetaResult<Vec<NoShuffleEnsemble>> {
188 let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
190
191 for (upstream_fid, relations) in &fragments.downstreams {
193 for rel in relations {
194 if rel.dispatcher_type == DispatcherType::NoShuffle {
195 new_no_shuffle
196 .entry(*upstream_fid)
197 .or_default()
198 .insert(rel.downstream_fragment_id);
199 }
200 }
201 }
202
203 for (upstream_fid, relations) in upstream_fragment_downstreams {
205 for rel in relations {
206 if rel.dispatcher_type == DispatcherType::NoShuffle {
207 new_no_shuffle
208 .entry(*upstream_fid)
209 .or_default()
210 .insert(rel.downstream_fragment_id);
211 }
212 }
213 }
214
215 let mut ensembles = if new_no_shuffle.is_empty() {
216 Vec::new()
217 } else {
218 let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
220 .iter()
221 .flat_map(|(upstream_fid, downstream_fids)| {
222 downstream_fids
223 .iter()
224 .map(move |downstream_fid| (*upstream_fid, *downstream_fid))
225 })
226 .collect();
227
228 let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
229 .iter()
230 .flat_map(|(u, d)| [*u, *d])
231 .collect::<HashSet<_>>()
232 .into_iter()
233 .collect();
234
235 let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
236 find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
237 };
238
239 let covered: HashSet<FragmentId> = ensembles
241 .iter()
242 .flat_map(|e| e.component_fragments())
243 .collect();
244 for fragment_id in fragments.inner.fragments.keys() {
245 if !covered.contains(fragment_id) {
246 ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
247 }
248 }
249
250 Ok(ensembles)
251}
252
253fn render_actors(
264 fragments: &StreamJobFragmentsToCreate,
265 database_info: &InflightDatabaseInfo,
266 definition: &str,
267 ctx: &StreamContext,
268 streaming_job_model: &streaming_job::Model,
269 actor_id_counter: &AtomicU32,
270 worker_map: &HashMap<WorkerId, WorkerNode>,
271 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
272 ensembles: &[NoShuffleEnsemble],
273 database_resource_group: &str,
274) -> MetaResult<RenderResult> {
275 let mut actor_assignments: HashMap<FragmentId, HashMap<ActorId, (WorkerId, Option<Bitmap>)>> =
278 HashMap::new();
279
280 for ensemble in ensembles {
281 let existing_fragment_ids: Vec<FragmentId> = ensemble
286 .component_fragments()
287 .filter(|fragment_id| !fragments.inner.fragments.contains_key(fragment_id))
288 .collect();
289
290 let actor_template = if let Some(&first_existing) = existing_fragment_ids.first() {
291 let template = EnsembleActorTemplate::from_existing_inflight_fragment(
292 database_info.fragment(first_existing),
293 );
294
295 for &other_fragment_id in &existing_fragment_ids[1..] {
298 let other = EnsembleActorTemplate::from_existing_inflight_fragment(
299 database_info.fragment(other_fragment_id),
300 );
301 template.assert_aligned_with(&other, first_existing, other_fragment_id);
302 }
303
304 template
305 } else {
306 let first_component = ensemble
308 .component_fragments()
309 .next()
310 .expect("ensemble must have at least one component");
311 let fragment = &fragments.inner.fragments[&first_component];
312 let distribution_type: DistributionType = fragment.distribution_type.into();
313 let vnode_count = fragment.vnode_count();
314
315 for fragment_id in ensemble.component_fragments() {
317 let f = &fragments.inner.fragments[&fragment_id];
318 assert_eq!(
319 vnode_count,
320 f.vnode_count(),
321 "component fragments {} and {} in the same no-shuffle ensemble have \
322 different vnode counts: {} vs {}",
323 first_component,
324 fragment_id,
325 vnode_count,
326 f.vnode_count(),
327 );
328 }
329
330 EnsembleActorTemplate::render_new(
331 streaming_job_model,
332 worker_map,
333 adaptive_parallelism_strategy,
334 None,
335 database_resource_group.to_owned(),
336 distribution_type,
337 vnode_count,
338 )?
339 };
340
341 for fragment_id in ensemble.component_fragments() {
343 if !fragments.inner.fragments.contains_key(&fragment_id) {
344 continue; }
346 let fragment = &fragments.inner.fragments[&fragment_id];
347 let distribution_type: DistributionType = fragment.distribution_type.into();
348 let aligner =
349 ComponentFragmentAligner::new_persistent(&actor_template, actor_id_counter);
350 let assignments = aligner.align_component_actor(distribution_type);
351 actor_assignments.insert(fragment_id, assignments);
352 }
353 }
354
355 let mut result_stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
357 let mut result_actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
358
359 for (fragment_id, assignments) in &actor_assignments {
360 let mut actors = Vec::with_capacity(assignments.len());
361 for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
362 result_actor_location.insert(actor_id, *worker_id);
363 actors.push(StreamActor {
364 actor_id,
365 fragment_id: *fragment_id,
366 vnode_bitmap: vnode_bitmap.clone(),
367 mview_definition: definition.to_owned(),
368 expr_context: Some(ctx.to_expr_context()),
369 config_override: ctx.config_override.clone(),
370 });
371 }
372 result_stream_actors.insert(*fragment_id, actors);
373 }
374
375 Ok(RenderResult {
376 stream_actors: result_stream_actors,
377 actor_location: result_actor_location,
378 })
379}
380impl DatabaseCheckpointControl {
381 fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
383 let table_ids_to_commit = self.database_info.existing_table_ids().collect();
384 let node_actors =
385 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
386 (table_ids_to_commit, node_actors)
387 }
388
389 fn apply_simple_command(
393 &self,
394 mutation: Option<Mutation>,
395 command_name: &'static str,
396 ) -> ApplyCommandResult {
397 let (table_ids, node_actors) = self.collect_base_info();
398 (
399 mutation,
400 table_ids,
401 None,
402 node_actors,
403 PostCollectCommand::Command(command_name.to_owned()),
404 )
405 }
406
407 pub(super) fn apply_command(
410 &mut self,
411 command: Option<Command>,
412 notifiers: &mut Vec<Notifier>,
413 barrier_info: BarrierInfo,
414 partial_graph_manager: &mut PartialGraphManager,
415 hummock_version_stats: &HummockVersionStats,
416 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
417 worker_nodes: &HashMap<WorkerId, WorkerNode>,
418 ) -> MetaResult<ApplyCommandInfo> {
419 debug_assert!(
420 !matches!(
421 command,
422 Some(Command::RescheduleIntent {
423 reschedule_plan: None,
424 ..
425 })
426 ),
427 "reschedule intent must be resolved before apply"
428 );
429 if matches!(
430 command,
431 Some(Command::RescheduleIntent {
432 reschedule_plan: None,
433 ..
434 })
435 ) {
436 bail!("reschedule intent must be resolved before apply");
437 }
438
439 fn resolve_source_splits(
444 info: &CreateStreamingJobCommandInfo,
445 render_result: &RenderResult,
446 actor_no_shuffle: &ActorNewNoShuffle,
447 database_info: &InflightDatabaseInfo,
448 ) -> MetaResult<SplitAssignment> {
449 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
450 .stream_actors
451 .iter()
452 .map(|(fragment_id, actors)| {
453 (
454 *fragment_id,
455 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
456 )
457 })
458 .collect();
459 let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
460 &info.stream_job_fragments,
461 &info.init_split_assignment,
462 &fragment_actor_ids,
463 )?;
464 resolved.extend(SourceManager::resolve_backfill_splits(
465 &info.stream_job_fragments,
466 actor_no_shuffle,
467 |fragment_id, actor_id| {
468 database_info
469 .fragment(fragment_id)
470 .actors
471 .get(&actor_id)
472 .map(|info| info.splits.clone())
473 },
474 )?);
475 Ok(resolved)
476 }
477
478 let mut throttle_for_creating_jobs: Option<(
480 HashSet<JobId>,
481 HashMap<FragmentId, ThrottleConfig>,
482 )> = None;
483
484 let (
488 mutation,
489 mut table_ids_to_commit,
490 mut actors_to_create,
491 mut node_actors,
492 post_collect_command,
493 ) = match command {
494 None => self.apply_simple_command(None, "barrier"),
495 Some(Command::CreateStreamingJob {
496 mut info,
497 job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
498 cross_db_snapshot_backfill_info,
499 }) => {
500 let ensembles = resolve_no_shuffle_ensembles(
501 &info.stream_job_fragments,
502 &info.upstream_fragment_downstreams,
503 )?;
504 let actors = render_actors(
505 &info.stream_job_fragments,
506 &self.database_info,
507 &info.definition,
508 &info.stream_job_fragments.inner.ctx,
509 &info.streaming_job_model,
510 partial_graph_manager
511 .control_stream_manager()
512 .env
513 .actor_id_generator(),
514 worker_nodes,
515 adaptive_parallelism_strategy,
516 &ensembles,
517 &info.database_resource_group,
518 )?;
519 {
520 assert!(!self.state.is_paused());
521 let snapshot_epoch = barrier_info.prev_epoch();
522 for snapshot_backfill_epoch in snapshot_backfill_info
524 .upstream_mv_table_id_to_backfill_epoch
525 .values_mut()
526 {
527 assert_eq!(
528 snapshot_backfill_epoch.replace(snapshot_epoch),
529 None,
530 "must not set previously"
531 );
532 }
533 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
534 fill_snapshot_backfill_epoch(
535 &mut fragment.nodes,
536 Some(&snapshot_backfill_info),
537 &cross_db_snapshot_backfill_info,
538 )?;
539 }
540 let job_id = info.stream_job_fragments.stream_job_id();
541 let snapshot_backfill_upstream_tables = snapshot_backfill_info
542 .upstream_mv_table_id_to_backfill_epoch
543 .keys()
544 .cloned()
545 .collect();
546
547 let mut edges = self.database_info.build_edge(
549 Some((&info, true)),
550 None,
551 None,
552 partial_graph_manager.control_stream_manager(),
553 &actors.stream_actors,
554 &actors.actor_location,
555 );
556 let resolved_split_assignment = resolve_source_splits(
558 &info,
559 &actors,
560 edges.actor_new_no_shuffle(),
561 &self.database_info,
562 )?;
563
564 let Entry::Vacant(entry) =
565 self.independent_checkpoint_job_controls.entry(job_id)
566 else {
567 panic!("duplicated creating snapshot backfill job {job_id}");
568 };
569
570 let job = CreatingStreamingJobControl::new(
571 entry,
572 CreateSnapshotBackfillJobCommandInfo {
573 info: info.clone(),
574 snapshot_backfill_info: snapshot_backfill_info.clone(),
575 cross_db_snapshot_backfill_info,
576 resolved_split_assignment: resolved_split_assignment.clone(),
577 },
578 take(notifiers),
579 snapshot_backfill_upstream_tables,
580 snapshot_epoch,
581 hummock_version_stats,
582 partial_graph_manager,
583 &mut edges,
584 &resolved_split_assignment,
585 &actors,
586 )?;
587
588 if let Some(fragment_infos) = job.fragment_infos() {
589 self.database_info.shared_actor_infos.upsert(
590 self.database_id,
591 fragment_infos.values().map(|f| (f, job_id)),
592 );
593 }
594
595 for upstream_mv_table_id in snapshot_backfill_info
596 .upstream_mv_table_id_to_backfill_epoch
597 .keys()
598 {
599 self.database_info.register_subscriber(
600 upstream_mv_table_id.as_job_id(),
601 info.streaming_job.id().as_subscriber_id(),
602 SubscriberType::SnapshotBackfill,
603 );
604 }
605
606 let mutation = Command::create_streaming_job_to_mutation(
607 &info,
608 &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
609 self.state.is_paused(),
610 &mut edges,
611 partial_graph_manager.control_stream_manager(),
612 None,
613 &resolved_split_assignment,
614 &actors.stream_actors,
615 &actors.actor_location,
616 )?;
617
618 let (table_ids, node_actors) = self.collect_base_info();
619 (
620 Some(mutation),
621 table_ids,
622 None,
623 node_actors,
624 PostCollectCommand::barrier(),
625 )
626 }
627 }
628 Some(Command::CreateStreamingJob {
629 mut info,
630 job_type,
631 cross_db_snapshot_backfill_info,
632 }) => {
633 let ensembles = resolve_no_shuffle_ensembles(
634 &info.stream_job_fragments,
635 &info.upstream_fragment_downstreams,
636 )?;
637 let actors = render_actors(
638 &info.stream_job_fragments,
639 &self.database_info,
640 &info.definition,
641 &info.stream_job_fragments.inner.ctx,
642 &info.streaming_job_model,
643 partial_graph_manager
644 .control_stream_manager()
645 .env
646 .actor_id_generator(),
647 worker_nodes,
648 adaptive_parallelism_strategy,
649 &ensembles,
650 &info.database_resource_group,
651 )?;
652 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
653 fill_snapshot_backfill_epoch(
654 &mut fragment.nodes,
655 None,
656 &cross_db_snapshot_backfill_info,
657 )?;
658 }
659
660 let new_upstream_sink =
662 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
663 Some(ctx)
664 } else {
665 None
666 };
667
668 let mut edges = self.database_info.build_edge(
669 Some((&info, false)),
670 None,
671 new_upstream_sink,
672 partial_graph_manager.control_stream_manager(),
673 &actors.stream_actors,
674 &actors.actor_location,
675 );
676 let resolved_split_assignment = resolve_source_splits(
678 &info,
679 &actors,
680 edges.actor_new_no_shuffle(),
681 &self.database_info,
682 )?;
683
684 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
686 let (fragment, _) =
687 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
688 .expect("should have parallel cdc fragment");
689 Some(CdcTableBackfillTracker::new(
690 fragment.fragment_id,
691 splits.clone(),
692 ))
693 } else {
694 None
695 };
696 self.database_info
697 .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
698 self.database_info.pre_apply_new_fragments(
699 info.stream_job_fragments
700 .new_fragment_info(
701 &actors.stream_actors,
702 &actors.actor_location,
703 &resolved_split_assignment,
704 )
705 .map(|(fragment_id, fragment_infos)| {
706 (fragment_id, info.streaming_job.id(), fragment_infos)
707 }),
708 );
709 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
710 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
711 self.database_info.pre_apply_add_node_upstream(
712 downstream_fragment_id,
713 &PbUpstreamSinkInfo {
714 upstream_fragment_id: ctx.sink_fragment_id,
715 sink_output_schema: ctx.sink_output_fields.clone(),
716 project_exprs: ctx.project_exprs.clone(),
717 },
718 );
719 }
720
721 let (table_ids, node_actors) = self.collect_base_info();
722
723 let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
725 &info,
726 &mut edges,
727 &actors.stream_actors,
728 &actors.actor_location,
729 ));
730
731 let actor_cdc_table_snapshot_splits = self
733 .database_info
734 .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
735
736 let is_currently_paused = self.state.is_paused();
738 let mutation = Command::create_streaming_job_to_mutation(
739 &info,
740 &job_type,
741 is_currently_paused,
742 &mut edges,
743 partial_graph_manager.control_stream_manager(),
744 actor_cdc_table_snapshot_splits,
745 &resolved_split_assignment,
746 &actors.stream_actors,
747 &actors.actor_location,
748 )?;
749
750 (
751 Some(mutation),
752 table_ids,
753 actors_to_create,
754 node_actors,
755 PostCollectCommand::CreateStreamingJob {
756 info,
757 job_type,
758 cross_db_snapshot_backfill_info,
759 resolved_split_assignment,
760 },
761 )
762 }
763
764 Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
765
766 Some(Command::Pause) => {
767 let prev_is_paused = self.state.is_paused();
768 self.state.set_is_paused(true);
769 let mutation = Command::pause_to_mutation(prev_is_paused);
770 let (table_ids, node_actors) = self.collect_base_info();
771 (
772 mutation,
773 table_ids,
774 None,
775 node_actors,
776 PostCollectCommand::Command("Pause".to_owned()),
777 )
778 }
779
780 Some(Command::Resume) => {
781 let prev_is_paused = self.state.is_paused();
782 self.state.set_is_paused(false);
783 let mutation = Command::resume_to_mutation(prev_is_paused);
784 let (table_ids, node_actors) = self.collect_base_info();
785 (
786 mutation,
787 table_ids,
788 None,
789 node_actors,
790 PostCollectCommand::Command("Resume".to_owned()),
791 )
792 }
793
794 Some(Command::Throttle { jobs, config }) => {
795 let mutation = Some(Command::throttle_to_mutation(&config));
796 for (fragment_id, throttle_config) in &config {
797 self.database_info
798 .pre_apply_throttle(*fragment_id, throttle_config);
799 }
800 throttle_for_creating_jobs = Some((jobs, config));
801 self.apply_simple_command(mutation, "Throttle")
802 }
803
804 Some(Command::DropStreamingJobs {
805 streaming_job_ids,
806 unregistered_fragment_ids,
807 dropped_sink_fragment_by_targets,
808 ..
809 }) => {
810 let actors = self
811 .database_info
812 .fragment_infos()
813 .filter(|fragment| {
814 self.database_info
815 .job_id_by_fragment(fragment.fragment_id)
816 .is_some_and(|job_id| streaming_job_ids.contains(&job_id))
817 })
818 .flat_map(|fragment| fragment.actors.keys().copied())
819 .collect::<Vec<_>>();
820
821 for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
823 self.database_info
824 .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
825 }
826
827 let (table_ids, node_actors) = self.collect_base_info();
828
829 self.database_info
831 .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
832
833 let mutation = Some(Command::drop_streaming_jobs_to_mutation(
834 &actors,
835 &dropped_sink_fragment_by_targets,
836 ));
837 (
838 mutation,
839 table_ids,
840 None,
841 node_actors,
842 PostCollectCommand::DropStreamingJobs,
843 )
844 }
845
846 Some(Command::RescheduleIntent {
847 reschedule_plan, ..
848 }) => {
849 let ReschedulePlan {
850 reschedules,
851 fragment_actors,
852 } = reschedule_plan
853 .as_ref()
854 .expect("reschedule intent should be resolved in global barrier worker");
855
856 for (fragment_id, reschedule) in reschedules {
858 self.database_info.pre_apply_reschedule(
859 *fragment_id,
860 reschedule
861 .added_actors
862 .iter()
863 .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
864 actors.iter().map(|actor_id| {
865 (
866 *actor_id,
867 InflightActorInfo {
868 worker_id: *node_id,
869 vnode_bitmap: reschedule
870 .newly_created_actors
871 .get(actor_id)
872 .expect("should exist")
873 .0
874 .0
875 .vnode_bitmap
876 .clone(),
877 splits: reschedule
878 .actor_splits
879 .get(actor_id)
880 .cloned()
881 .unwrap_or_default(),
882 },
883 )
884 })
885 })
886 .collect(),
887 reschedule
888 .vnode_bitmap_updates
889 .iter()
890 .filter(|(actor_id, _)| {
891 !reschedule.newly_created_actors.contains_key(*actor_id)
892 })
893 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
894 .collect(),
895 reschedule.actor_splits.clone(),
896 );
897 }
898
899 let (table_ids, node_actors) = self.collect_base_info();
900
901 let actors_to_create = Some(Command::reschedule_actors_to_create(
903 reschedules,
904 fragment_actors,
905 &self.database_info,
906 partial_graph_manager.control_stream_manager(),
907 ));
908
909 self.database_info
911 .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
912 (
913 *fragment_id,
914 reschedule.removed_actors.iter().cloned().collect(),
915 )
916 }));
917
918 let mutation = Command::reschedule_to_mutation(
920 reschedules,
921 fragment_actors,
922 partial_graph_manager.control_stream_manager(),
923 &mut self.database_info,
924 )?;
925
926 let reschedules = reschedule_plan
927 .expect("reschedule intent should be resolved in global barrier worker")
928 .reschedules;
929 (
930 mutation,
931 table_ids,
932 actors_to_create,
933 node_actors,
934 PostCollectCommand::Reschedule { reschedules },
935 )
936 }
937
938 Some(Command::ReplaceStreamJob(plan)) => {
939 let ensembles = resolve_no_shuffle_ensembles(
940 &plan.new_fragments,
941 &plan.upstream_fragment_downstreams,
942 )?;
943 let mut render_result = render_actors(
944 &plan.new_fragments,
945 &self.database_info,
946 "", &plan.new_fragments.inner.ctx,
948 &plan.streaming_job_model,
949 partial_graph_manager
950 .control_stream_manager()
951 .env
952 .actor_id_generator(),
953 worker_nodes,
954 adaptive_parallelism_strategy,
955 &ensembles,
956 &plan.database_resource_group,
957 )?;
958
959 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
962 let actor_id_counter = partial_graph_manager
963 .control_stream_manager()
964 .env
965 .actor_id_generator();
966 for sink_ctx in sinks {
967 let original_fragment_id = sink_ctx.original_fragment.fragment_id;
968 let original_frag_info = self.database_info.fragment(original_fragment_id);
969 let actor_template = EnsembleActorTemplate::from_existing_inflight_fragment(
970 original_frag_info,
971 );
972 let new_aligner = ComponentFragmentAligner::new_persistent(
973 &actor_template,
974 actor_id_counter,
975 );
976 let distribution_type: DistributionType =
977 sink_ctx.new_fragment.distribution_type.into();
978 let actor_assignments =
979 new_aligner.align_component_actor(distribution_type);
980 let new_fragment_id = sink_ctx.new_fragment.fragment_id;
981 let mut actors = Vec::with_capacity(actor_assignments.len());
982 for (&actor_id, (worker_id, vnode_bitmap)) in &actor_assignments {
983 render_result.actor_location.insert(actor_id, *worker_id);
984 actors.push(StreamActor {
985 actor_id,
986 fragment_id: new_fragment_id,
987 vnode_bitmap: vnode_bitmap.clone(),
988 mview_definition: String::new(),
989 expr_context: Some(sink_ctx.ctx.to_expr_context()),
990 config_override: sink_ctx.ctx.config_override.clone(),
991 });
992 }
993 render_result.stream_actors.insert(new_fragment_id, actors);
994 }
995 }
996
997 let mut edges = self.database_info.build_edge(
999 None,
1000 Some(&plan),
1001 None,
1002 partial_graph_manager.control_stream_manager(),
1003 &render_result.stream_actors,
1004 &render_result.actor_location,
1005 );
1006
1007 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
1009 .stream_actors
1010 .iter()
1011 .map(|(fragment_id, actors)| {
1012 (
1013 *fragment_id,
1014 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
1015 )
1016 })
1017 .collect();
1018 let resolved_split_assignment = match &plan.split_plan {
1019 ReplaceJobSplitPlan::Discovered(discovered) => {
1020 SourceManager::resolve_fragment_to_actor_splits(
1021 &plan.new_fragments,
1022 discovered,
1023 &fragment_actor_ids,
1024 )?
1025 }
1026 ReplaceJobSplitPlan::AlignFromPrevious => {
1027 SourceManager::resolve_replace_source_splits(
1028 &plan.new_fragments,
1029 &plan.replace_upstream,
1030 edges.actor_new_no_shuffle(),
1031 |_fragment_id, actor_id| {
1032 self.database_info.fragment_infos().find_map(|fragment| {
1033 fragment
1034 .actors
1035 .get(&actor_id)
1036 .map(|info| info.splits.clone())
1037 })
1038 },
1039 )?
1040 }
1041 };
1042
1043 self.database_info.pre_apply_new_fragments(
1045 plan.new_fragments
1046 .new_fragment_info(
1047 &render_result.stream_actors,
1048 &render_result.actor_location,
1049 &resolved_split_assignment,
1050 )
1051 .map(|(fragment_id, new_fragment)| {
1052 (fragment_id, plan.streaming_job.id(), new_fragment)
1053 }),
1054 );
1055 for (fragment_id, replace_map) in &plan.replace_upstream {
1056 self.database_info
1057 .pre_apply_replace_node_upstream(*fragment_id, replace_map);
1058 }
1059 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1060 self.database_info
1061 .pre_apply_new_fragments(sinks.iter().map(|sink| {
1062 (
1063 sink.new_fragment.fragment_id,
1064 sink.original_sink.id.as_job_id(),
1065 sink.new_fragment_info(
1066 &render_result.stream_actors,
1067 &render_result.actor_location,
1068 ),
1069 )
1070 }));
1071 }
1072
1073 let (table_ids, node_actors) = self.collect_base_info();
1074
1075 let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
1077 &plan,
1078 &mut edges,
1079 &self.database_info,
1080 &render_result.stream_actors,
1081 &render_result.actor_location,
1082 ));
1083
1084 let mutation = Command::replace_stream_job_to_mutation(
1087 &plan,
1088 &mut edges,
1089 &mut self.database_info,
1090 &resolved_split_assignment,
1091 )?;
1092
1093 {
1095 let mut fragment_ids_to_remove: Vec<_> = plan
1096 .old_fragments
1097 .fragments
1098 .values()
1099 .map(|f| f.fragment_id)
1100 .collect();
1101 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1102 fragment_ids_to_remove
1103 .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
1104 }
1105 self.database_info
1106 .post_apply_remove_fragments(fragment_ids_to_remove);
1107 }
1108
1109 (
1110 mutation,
1111 table_ids,
1112 actors_to_create,
1113 node_actors,
1114 PostCollectCommand::ReplaceStreamJob {
1115 plan,
1116 resolved_split_assignment,
1117 },
1118 )
1119 }
1120
1121 Some(Command::SourceChangeSplit(split_state)) => {
1122 self.database_info.pre_apply_split_assignments(
1124 split_state
1125 .split_assignment
1126 .iter()
1127 .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
1128 );
1129
1130 let mutation = Some(Command::source_change_split_to_mutation(
1131 &split_state.split_assignment,
1132 ));
1133 let (table_ids, node_actors) = self.collect_base_info();
1134 (
1135 mutation,
1136 table_ids,
1137 None,
1138 node_actors,
1139 PostCollectCommand::SourceChangeSplit {
1140 split_assignment: split_state.split_assignment,
1141 },
1142 )
1143 }
1144
1145 Some(Command::CreateSubscription {
1146 subscription_id,
1147 upstream_mv_table_id,
1148 retention_second,
1149 }) => {
1150 self.database_info.register_subscriber(
1151 upstream_mv_table_id.as_job_id(),
1152 subscription_id.as_subscriber_id(),
1153 SubscriberType::Subscription(retention_second),
1154 );
1155 let mutation = Some(Command::create_subscription_to_mutation(
1156 upstream_mv_table_id,
1157 subscription_id,
1158 ));
1159 let (table_ids, node_actors) = self.collect_base_info();
1160 (
1161 mutation,
1162 table_ids,
1163 None,
1164 node_actors,
1165 PostCollectCommand::CreateSubscription { subscription_id },
1166 )
1167 }
1168
1169 Some(Command::DropSubscription {
1170 subscription_id,
1171 upstream_mv_table_id,
1172 }) => {
1173 if self
1174 .database_info
1175 .unregister_subscriber(
1176 upstream_mv_table_id.as_job_id(),
1177 subscription_id.as_subscriber_id(),
1178 )
1179 .is_none()
1180 {
1181 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
1182 }
1183 let mutation = Some(Command::drop_subscription_to_mutation(
1184 upstream_mv_table_id,
1185 subscription_id,
1186 ));
1187 let (table_ids, node_actors) = self.collect_base_info();
1188 (
1189 mutation,
1190 table_ids,
1191 None,
1192 node_actors,
1193 PostCollectCommand::Command("DropSubscription".to_owned()),
1194 )
1195 }
1196
1197 Some(Command::AlterSubscriptionRetention {
1198 subscription_id,
1199 upstream_mv_table_id,
1200 retention_second,
1201 }) => {
1202 self.database_info.update_subscription_retention(
1203 upstream_mv_table_id.as_job_id(),
1204 subscription_id.as_subscriber_id(),
1205 retention_second,
1206 );
1207 self.apply_simple_command(None, "AlterSubscriptionRetention")
1208 }
1209
1210 Some(Command::ConnectorPropsChange(config)) => {
1211 let mutation = Some(Command::connector_props_change_to_mutation(&config));
1212 let (table_ids, node_actors) = self.collect_base_info();
1213 (
1214 mutation,
1215 table_ids,
1216 None,
1217 node_actors,
1218 PostCollectCommand::ConnectorPropsChange(config),
1219 )
1220 }
1221
1222 Some(Command::Refresh {
1223 table_id,
1224 associated_source_id,
1225 }) => {
1226 let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
1227 self.apply_simple_command(mutation, "Refresh")
1228 }
1229
1230 Some(Command::ListFinish {
1231 table_id: _,
1232 associated_source_id,
1233 }) => {
1234 let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
1235 self.apply_simple_command(mutation, "ListFinish")
1236 }
1237
1238 Some(Command::LoadFinish {
1239 table_id: _,
1240 associated_source_id,
1241 }) => {
1242 let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
1243 self.apply_simple_command(mutation, "LoadFinish")
1244 }
1245
1246 Some(Command::ResetSource { source_id }) => {
1247 let mutation = Some(Command::reset_source_to_mutation(source_id));
1248 self.apply_simple_command(mutation, "ResetSource")
1249 }
1250
1251 Some(Command::ResumeBackfill { target }) => {
1252 let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
1253 let (table_ids, node_actors) = self.collect_base_info();
1254 (
1255 mutation,
1256 table_ids,
1257 None,
1258 node_actors,
1259 PostCollectCommand::ResumeBackfill { target },
1260 )
1261 }
1262
1263 Some(Command::InjectSourceOffsets {
1264 source_id,
1265 split_offsets,
1266 }) => {
1267 let mutation = Some(Command::inject_source_offsets_to_mutation(
1268 source_id,
1269 &split_offsets,
1270 ));
1271 self.apply_simple_command(mutation, "InjectSourceOffsets")
1272 }
1273 };
1274
1275 let mut finished_snapshot_backfill_jobs = HashSet::new();
1276 let mutation = match mutation {
1277 Some(mutation) => Some(mutation),
1278 None => {
1279 let mut finished_snapshot_backfill_job_info = HashMap::new();
1280 if barrier_info.kind.is_checkpoint() {
1281 for (&job_id, job) in &mut self.independent_checkpoint_job_controls {
1282 let IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) =
1283 job;
1284 if creating_job.should_merge_to_upstream() {
1285 let info = creating_job
1286 .start_consume_upstream(partial_graph_manager, &barrier_info)?;
1287 finished_snapshot_backfill_job_info
1288 .try_insert(job_id, info)
1289 .expect("non-duplicated");
1290 }
1291 }
1292 }
1293
1294 if !finished_snapshot_backfill_job_info.is_empty() {
1295 let actors_to_create = actors_to_create.get_or_insert_default();
1296 let mut subscriptions_to_drop = vec![];
1297 let mut dispatcher_update = vec![];
1298 let mut actor_splits = HashMap::new();
1299 for (job_id, info) in finished_snapshot_backfill_job_info {
1300 finished_snapshot_backfill_jobs.insert(job_id);
1301 subscriptions_to_drop.extend(
1302 info.snapshot_backfill_upstream_tables.iter().map(
1303 |upstream_table_id| PbSubscriptionUpstreamInfo {
1304 subscriber_id: job_id.as_subscriber_id(),
1305 upstream_mv_table_id: *upstream_table_id,
1306 },
1307 ),
1308 );
1309 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
1310 assert_matches!(
1311 self.database_info.unregister_subscriber(
1312 upstream_mv_table_id.as_job_id(),
1313 job_id.as_subscriber_id()
1314 ),
1315 Some(SubscriberType::SnapshotBackfill)
1316 );
1317 }
1318
1319 table_ids_to_commit.extend(
1320 info.fragment_infos
1321 .values()
1322 .flat_map(|fragment| fragment.state_table_ids.iter())
1323 .copied(),
1324 );
1325
1326 let actor_len = info
1327 .fragment_infos
1328 .values()
1329 .map(|fragment| fragment.actors.len() as u64)
1330 .sum();
1331 let id_gen = GlobalActorIdGen::new(
1332 partial_graph_manager
1333 .control_stream_manager()
1334 .env
1335 .actor_id_generator(),
1336 actor_len,
1337 );
1338 let mut next_local_actor_id = 0;
1339 let actor_mapping: HashMap<_, _> = info
1341 .fragment_infos
1342 .values()
1343 .flat_map(|fragment| fragment.actors.keys())
1344 .map(|old_actor_id| {
1345 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
1346 next_local_actor_id += 1;
1347 (*old_actor_id, new_actor_id.as_global_id())
1348 })
1349 .collect();
1350 let actor_mapping = &actor_mapping;
1351 let new_stream_actors: HashMap<_, _> = info
1352 .stream_actors
1353 .into_iter()
1354 .map(|(old_actor_id, mut actor)| {
1355 let new_actor_id = actor_mapping[&old_actor_id];
1356 actor.actor_id = new_actor_id;
1357 (new_actor_id, actor)
1358 })
1359 .collect();
1360 let new_fragment_info: HashMap<_, _> = info
1361 .fragment_infos
1362 .into_iter()
1363 .map(|(fragment_id, mut fragment)| {
1364 let actors = take(&mut fragment.actors);
1365 fragment.actors = actors
1366 .into_iter()
1367 .map(|(old_actor_id, actor)| {
1368 let new_actor_id = actor_mapping[&old_actor_id];
1369 (new_actor_id, actor)
1370 })
1371 .collect();
1372 (fragment_id, fragment)
1373 })
1374 .collect();
1375 actor_splits.extend(
1376 new_fragment_info
1377 .values()
1378 .flat_map(|fragment| &fragment.actors)
1379 .map(|(actor_id, actor)| {
1380 (
1381 *actor_id,
1382 ConnectorSplits {
1383 splits: actor
1384 .splits
1385 .iter()
1386 .map(ConnectorSplit::from)
1387 .collect(),
1388 },
1389 )
1390 }),
1391 );
1392 let partial_graph_id = to_partial_graph_id(self.database_id, None);
1394 let mut edge_builder = FragmentEdgeBuilder::new(
1395 info.upstream_fragment_downstreams
1396 .keys()
1397 .map(|upstream_fragment_id| {
1398 self.database_info.fragment(*upstream_fragment_id)
1399 })
1400 .chain(new_fragment_info.values())
1401 .map(|fragment| {
1402 (
1403 fragment.fragment_id,
1404 EdgeBuilderFragmentInfo::from_inflight(
1405 fragment,
1406 partial_graph_id,
1407 partial_graph_manager.control_stream_manager(),
1408 ),
1409 )
1410 }),
1411 );
1412 edge_builder.add_relations(&info.upstream_fragment_downstreams);
1413 edge_builder.add_relations(&info.downstreams);
1414 let mut edges = edge_builder.build();
1415 let new_actors_to_create = edges.collect_actors_to_create(
1416 new_fragment_info.values().map(|fragment| {
1417 (
1418 fragment.fragment_id,
1419 &fragment.nodes,
1420 fragment.actors.iter().map(|(actor_id, actor)| {
1421 (&new_stream_actors[actor_id], actor.worker_id)
1422 }),
1423 [], )
1425 }),
1426 );
1427 dispatcher_update.extend(
1428 info.upstream_fragment_downstreams.keys().flat_map(
1429 |upstream_fragment_id| {
1430 let new_actor_dispatchers = edges
1431 .dispatchers
1432 .remove(upstream_fragment_id)
1433 .expect("should exist");
1434 new_actor_dispatchers.into_iter().flat_map(
1435 |(upstream_actor_id, dispatchers)| {
1436 dispatchers.into_iter().map(move |dispatcher| {
1437 PbDispatcherUpdate {
1438 actor_id: upstream_actor_id,
1439 dispatcher_id: dispatcher.dispatcher_id,
1440 hash_mapping: dispatcher.hash_mapping,
1441 removed_downstream_actor_id: dispatcher
1442 .downstream_actor_id
1443 .iter()
1444 .map(|new_downstream_actor_id| {
1445 actor_mapping
1446 .iter()
1447 .find_map(
1448 |(old_actor_id, new_actor_id)| {
1449 (new_downstream_actor_id
1450 == new_actor_id)
1451 .then_some(*old_actor_id)
1452 },
1453 )
1454 .expect("should exist")
1455 })
1456 .collect(),
1457 added_downstream_actor_id: dispatcher
1458 .downstream_actor_id,
1459 }
1460 })
1461 },
1462 )
1463 },
1464 ),
1465 );
1466 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1467 for (worker_id, worker_actors) in new_actors_to_create {
1468 node_actors.entry(worker_id).or_default().extend(
1469 worker_actors.values().flat_map(|(_, actors, _)| {
1470 actors.iter().map(|(actor, _, _)| actor.actor_id)
1471 }),
1472 );
1473 actors_to_create
1474 .entry(worker_id)
1475 .or_default()
1476 .extend(worker_actors);
1477 }
1478 self.database_info.add_existing(InflightStreamingJobInfo {
1479 job_id,
1480 fragment_infos: new_fragment_info,
1481 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
1483 cdc_table_backfill_tracker: None, });
1485 }
1486
1487 Some(PbMutation::Update(PbUpdateMutation {
1488 dispatcher_update,
1489 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
1493 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
1497 }))
1498 } else {
1499 let fragment_ids = self.database_info.take_pending_backfill_nodes();
1500 if fragment_ids.is_empty() {
1501 None
1502 } else {
1503 Some(PbMutation::StartFragmentBackfill(
1504 PbStartFragmentBackfillMutation { fragment_ids },
1505 ))
1506 }
1507 }
1508 }
1509 };
1510
1511 for (job_id, job) in &mut self.independent_checkpoint_job_controls {
1513 let IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) = job;
1514 if !finished_snapshot_backfill_jobs.contains(job_id) {
1515 let throttle_mutation = if let Some((ref jobs, ref config)) =
1516 throttle_for_creating_jobs
1517 && jobs.contains(job_id)
1518 {
1519 assert_eq!(
1520 jobs.len(),
1521 1,
1522 "should not alter rate limit of snapshot backfill job with other jobs"
1523 );
1524 Some((
1525 Mutation::Throttle(ThrottleMutation {
1526 fragment_throttle: config
1527 .iter()
1528 .map(|(fragment_id, config)| (*fragment_id, *config))
1529 .collect(),
1530 }),
1531 take(notifiers),
1532 ))
1533 } else {
1534 None
1535 };
1536 creating_job.on_new_upstream_barrier(
1537 partial_graph_manager,
1538 &barrier_info,
1539 throttle_mutation,
1540 )?;
1541 }
1542 }
1543
1544 partial_graph_manager.inject_barrier(
1545 to_partial_graph_id(self.database_id, None),
1546 mutation,
1547 &node_actors,
1548 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1549 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1550 actors_to_create,
1551 PartialGraphBarrierInfo::new(
1552 post_collect_command,
1553 barrier_info,
1554 take(notifiers),
1555 table_ids_to_commit,
1556 ),
1557 )?;
1558
1559 Ok(ApplyCommandInfo {
1560 jobs_to_wait: finished_snapshot_backfill_jobs,
1561 })
1562 }
1563}