1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::sync::atomic::AtomicU32;
20
21use risingwave_common::bail;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::TableId;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
33use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
34use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
35use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
36use risingwave_pb::stream_plan::{
37 PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
38 PbUpstreamSinkInfo, ThrottleMutation,
39};
40use tracing::warn;
41
42use crate::MetaResult;
43use crate::barrier::cdc_progress::CdcTableBackfillTracker;
44use crate::barrier::checkpoint::{
45 CreatingStreamingJobControl, DatabaseCheckpointControl, IndependentCheckpointJobControl,
46};
47use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
48use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
49use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
50use crate::barrier::info::{
51 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
52 SubscriberType,
53};
54use crate::barrier::notifier::Notifier;
55use crate::barrier::partial_graph::{PartialGraphBarrierInfo, PartialGraphManager};
56use crate::barrier::rpc::to_partial_graph_id;
57use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
58use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
59use crate::controller::scale::{
60 ComponentFragmentAligner, EnsembleActorTemplate, NoShuffleEnsemble,
61 build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
62};
63use crate::model::{
64 ActorId, ActorNewNoShuffle, FragmentDownstreamRelation, FragmentId, StreamActor, StreamContext,
65 StreamJobActorsToCreate, StreamJobFragmentsToCreate,
66};
67use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
68use crate::stream::{
69 GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
70 fill_snapshot_backfill_epoch,
71};
72
73pub(in crate::barrier) struct BarrierWorkerState {
75 in_flight_prev_epoch: TracedEpoch,
80
81 pending_non_checkpoint_barriers: Vec<u64>,
83
84 is_paused: bool,
86}
87
88impl BarrierWorkerState {
89 pub(super) fn new() -> Self {
90 Self {
91 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
92 pending_non_checkpoint_barriers: vec![],
93 is_paused: false,
94 }
95 }
96
97 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
98 Self {
99 in_flight_prev_epoch,
100 pending_non_checkpoint_barriers: vec![],
101 is_paused,
102 }
103 }
104
105 pub fn is_paused(&self) -> bool {
106 self.is_paused
107 }
108
109 fn set_is_paused(&mut self, is_paused: bool) {
110 if self.is_paused != is_paused {
111 tracing::info!(
112 currently_paused = self.is_paused,
113 newly_paused = is_paused,
114 "update paused state"
115 );
116 self.is_paused = is_paused;
117 }
118 }
119
120 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
121 &self.in_flight_prev_epoch
122 }
123
124 pub fn next_barrier_info(
126 &mut self,
127 is_checkpoint: bool,
128 curr_epoch: TracedEpoch,
129 ) -> BarrierInfo {
130 assert!(
131 self.in_flight_prev_epoch.value() < curr_epoch.value(),
132 "curr epoch regress. {} > {}",
133 self.in_flight_prev_epoch.value(),
134 curr_epoch.value()
135 );
136 let prev_epoch = self.in_flight_prev_epoch.clone();
137 self.in_flight_prev_epoch = curr_epoch.clone();
138 self.pending_non_checkpoint_barriers
139 .push(prev_epoch.value().0);
140 let kind = if is_checkpoint {
141 let epochs = take(&mut self.pending_non_checkpoint_barriers);
142 BarrierKind::Checkpoint(epochs)
143 } else {
144 BarrierKind::Barrier
145 };
146 BarrierInfo {
147 prev_epoch,
148 curr_epoch,
149 kind,
150 }
151 }
152}
153
154pub(super) struct ApplyCommandInfo {
155 pub jobs_to_wait: HashSet<JobId>,
156}
157
158type ApplyCommandResult = (
161 Option<Mutation>,
162 HashSet<TableId>,
163 Option<StreamJobActorsToCreate>,
164 HashMap<WorkerId, HashSet<ActorId>>,
165 PostCollectCommand,
166);
167
168pub(crate) struct RenderResult {
170 pub stream_actors: HashMap<FragmentId, Vec<StreamActor>>,
172 pub actor_location: HashMap<ActorId, WorkerId>,
174}
175
176fn resolve_no_shuffle_ensembles(
185 fragments: &StreamJobFragmentsToCreate,
186 upstream_fragment_downstreams: &FragmentDownstreamRelation,
187) -> MetaResult<Vec<NoShuffleEnsemble>> {
188 let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
190
191 for (upstream_fid, relations) in &fragments.downstreams {
193 for rel in relations {
194 if rel.dispatcher_type == DispatcherType::NoShuffle {
195 new_no_shuffle
196 .entry(*upstream_fid)
197 .or_default()
198 .insert(rel.downstream_fragment_id);
199 }
200 }
201 }
202
203 for (upstream_fid, relations) in upstream_fragment_downstreams {
205 for rel in relations {
206 if rel.dispatcher_type == DispatcherType::NoShuffle {
207 new_no_shuffle
208 .entry(*upstream_fid)
209 .or_default()
210 .insert(rel.downstream_fragment_id);
211 }
212 }
213 }
214
215 let mut ensembles = if new_no_shuffle.is_empty() {
216 Vec::new()
217 } else {
218 let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
220 .iter()
221 .flat_map(|(upstream_fid, downstream_fids)| {
222 downstream_fids
223 .iter()
224 .map(move |downstream_fid| (*upstream_fid, *downstream_fid))
225 })
226 .collect();
227
228 let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
229 .iter()
230 .flat_map(|(u, d)| [*u, *d])
231 .collect::<HashSet<_>>()
232 .into_iter()
233 .collect();
234
235 let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
236 find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
237 };
238
239 let covered: HashSet<FragmentId> = ensembles
241 .iter()
242 .flat_map(|e| e.component_fragments())
243 .collect();
244 for fragment_id in fragments.inner.fragments.keys() {
245 if !covered.contains(fragment_id) {
246 ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
247 }
248 }
249
250 Ok(ensembles)
251}
252
253fn render_actors(
264 fragments: &StreamJobFragmentsToCreate,
265 database_info: &InflightDatabaseInfo,
266 definition: &str,
267 ctx: &StreamContext,
268 streaming_job_model: &streaming_job::Model,
269 actor_id_counter: &AtomicU32,
270 worker_map: &HashMap<WorkerId, WorkerNode>,
271 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
272 ensembles: &[NoShuffleEnsemble],
273 database_resource_group: &str,
274) -> MetaResult<RenderResult> {
275 let mut actor_assignments: HashMap<FragmentId, HashMap<ActorId, (WorkerId, Option<Bitmap>)>> =
278 HashMap::new();
279
280 for ensemble in ensembles {
281 let existing_fragment_ids: Vec<FragmentId> = ensemble
286 .component_fragments()
287 .filter(|fragment_id| !fragments.inner.fragments.contains_key(fragment_id))
288 .collect();
289
290 let actor_template = if let Some(&first_existing) = existing_fragment_ids.first() {
291 let template = EnsembleActorTemplate::from_existing_inflight_fragment(
292 database_info.fragment(first_existing),
293 );
294
295 for &other_fragment_id in &existing_fragment_ids[1..] {
298 let other = EnsembleActorTemplate::from_existing_inflight_fragment(
299 database_info.fragment(other_fragment_id),
300 );
301 template.assert_aligned_with(&other, first_existing, other_fragment_id);
302 }
303
304 template
305 } else {
306 let first_component = ensemble
308 .component_fragments()
309 .next()
310 .expect("ensemble must have at least one component");
311 let fragment = &fragments.inner.fragments[&first_component];
312 let distribution_type: DistributionType = fragment.distribution_type.into();
313 let vnode_count = fragment.vnode_count();
314
315 for fragment_id in ensemble.component_fragments() {
317 let f = &fragments.inner.fragments[&fragment_id];
318 assert_eq!(
319 vnode_count,
320 f.vnode_count(),
321 "component fragments {} and {} in the same no-shuffle ensemble have \
322 different vnode counts: {} vs {}",
323 first_component,
324 fragment_id,
325 vnode_count,
326 f.vnode_count(),
327 );
328 }
329
330 EnsembleActorTemplate::render_new(
331 streaming_job_model,
332 worker_map,
333 adaptive_parallelism_strategy,
334 None,
335 database_resource_group.to_owned(),
336 distribution_type,
337 vnode_count,
338 )?
339 };
340
341 for fragment_id in ensemble.component_fragments() {
343 if !fragments.inner.fragments.contains_key(&fragment_id) {
344 continue; }
346 let fragment = &fragments.inner.fragments[&fragment_id];
347 let distribution_type: DistributionType = fragment.distribution_type.into();
348 let aligner =
349 ComponentFragmentAligner::new_persistent(&actor_template, actor_id_counter);
350 let assignments = aligner.align_component_actor(distribution_type);
351 actor_assignments.insert(fragment_id, assignments);
352 }
353 }
354
355 let mut result_stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
357 let mut result_actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
358
359 for (fragment_id, assignments) in &actor_assignments {
360 let mut actors = Vec::with_capacity(assignments.len());
361 for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
362 result_actor_location.insert(actor_id, *worker_id);
363 actors.push(StreamActor {
364 actor_id,
365 fragment_id: *fragment_id,
366 vnode_bitmap: vnode_bitmap.clone(),
367 mview_definition: definition.to_owned(),
368 expr_context: Some(ctx.to_expr_context()),
369 config_override: ctx.config_override.clone(),
370 });
371 }
372 result_stream_actors.insert(*fragment_id, actors);
373 }
374
375 Ok(RenderResult {
376 stream_actors: result_stream_actors,
377 actor_location: result_actor_location,
378 })
379}
380impl DatabaseCheckpointControl {
381 fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
383 let table_ids_to_commit = self.database_info.existing_table_ids().collect();
384 let node_actors =
385 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
386 (table_ids_to_commit, node_actors)
387 }
388
389 fn apply_simple_command(
393 &self,
394 mutation: Option<Mutation>,
395 command_name: &'static str,
396 ) -> ApplyCommandResult {
397 let (table_ids, node_actors) = self.collect_base_info();
398 (
399 mutation,
400 table_ids,
401 None,
402 node_actors,
403 PostCollectCommand::Command(command_name.to_owned()),
404 )
405 }
406
407 pub(super) fn apply_command(
410 &mut self,
411 command: Option<Command>,
412 notifiers: &mut Vec<Notifier>,
413 barrier_info: BarrierInfo,
414 partial_graph_manager: &mut PartialGraphManager,
415 hummock_version_stats: &HummockVersionStats,
416 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
417 worker_nodes: &HashMap<WorkerId, WorkerNode>,
418 ) -> MetaResult<ApplyCommandInfo> {
419 debug_assert!(
420 !matches!(
421 command,
422 Some(Command::RescheduleIntent {
423 reschedule_plan: None,
424 ..
425 })
426 ),
427 "reschedule intent must be resolved before apply"
428 );
429 if matches!(
430 command,
431 Some(Command::RescheduleIntent {
432 reschedule_plan: None,
433 ..
434 })
435 ) {
436 bail!("reschedule intent must be resolved before apply");
437 }
438
439 fn resolve_source_splits(
444 info: &CreateStreamingJobCommandInfo,
445 render_result: &RenderResult,
446 actor_no_shuffle: &ActorNewNoShuffle,
447 database_info: &InflightDatabaseInfo,
448 ) -> MetaResult<SplitAssignment> {
449 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
450 .stream_actors
451 .iter()
452 .map(|(fragment_id, actors)| {
453 (
454 *fragment_id,
455 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
456 )
457 })
458 .collect();
459 let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
460 &info.stream_job_fragments,
461 &info.init_split_assignment,
462 &fragment_actor_ids,
463 )?;
464 resolved.extend(SourceManager::resolve_backfill_splits(
465 &info.stream_job_fragments,
466 actor_no_shuffle,
467 |fragment_id, actor_id| {
468 database_info
469 .fragment(fragment_id)
470 .actors
471 .get(&actor_id)
472 .map(|info| info.splits.clone())
473 },
474 )?);
475 Ok(resolved)
476 }
477
478 let mut throttle_for_creating_jobs: Option<(
480 HashSet<JobId>,
481 HashMap<FragmentId, ThrottleConfig>,
482 )> = None;
483
484 let (
488 mutation,
489 mut table_ids_to_commit,
490 mut actors_to_create,
491 mut node_actors,
492 post_collect_command,
493 ) = match command {
494 None => self.apply_simple_command(None, "barrier"),
495 Some(Command::CreateStreamingJob {
496 mut info,
497 job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
498 cross_db_snapshot_backfill_info,
499 }) => {
500 let ensembles = resolve_no_shuffle_ensembles(
501 &info.stream_job_fragments,
502 &info.upstream_fragment_downstreams,
503 )?;
504 let actors = render_actors(
505 &info.stream_job_fragments,
506 &self.database_info,
507 &info.definition,
508 &info.stream_job_fragments.inner.ctx,
509 &info.streaming_job_model,
510 partial_graph_manager
511 .control_stream_manager()
512 .env
513 .actor_id_generator(),
514 worker_nodes,
515 adaptive_parallelism_strategy,
516 &ensembles,
517 &info.database_resource_group,
518 )?;
519 {
520 assert!(!self.state.is_paused());
521 let snapshot_epoch = barrier_info.prev_epoch();
522 for snapshot_backfill_epoch in snapshot_backfill_info
524 .upstream_mv_table_id_to_backfill_epoch
525 .values_mut()
526 {
527 assert_eq!(
528 snapshot_backfill_epoch.replace(snapshot_epoch),
529 None,
530 "must not set previously"
531 );
532 }
533 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
534 fill_snapshot_backfill_epoch(
535 &mut fragment.nodes,
536 Some(&snapshot_backfill_info),
537 &cross_db_snapshot_backfill_info,
538 )?;
539 }
540 let job_id = info.stream_job_fragments.stream_job_id();
541 let snapshot_backfill_upstream_tables = snapshot_backfill_info
542 .upstream_mv_table_id_to_backfill_epoch
543 .keys()
544 .cloned()
545 .collect();
546
547 let mut edges = self.database_info.build_edge(
549 Some((&info, true)),
550 None,
551 None,
552 partial_graph_manager.control_stream_manager(),
553 &actors.stream_actors,
554 &actors.actor_location,
555 );
556 let resolved_split_assignment = resolve_source_splits(
558 &info,
559 &actors,
560 edges.actor_new_no_shuffle(),
561 &self.database_info,
562 )?;
563
564 let Entry::Vacant(entry) =
565 self.independent_checkpoint_job_controls.entry(job_id)
566 else {
567 panic!("duplicated creating snapshot backfill job {job_id}");
568 };
569
570 let job = CreatingStreamingJobControl::new(
571 entry,
572 CreateSnapshotBackfillJobCommandInfo {
573 info: info.clone(),
574 snapshot_backfill_info: snapshot_backfill_info.clone(),
575 cross_db_snapshot_backfill_info,
576 resolved_split_assignment: resolved_split_assignment.clone(),
577 },
578 take(notifiers),
579 snapshot_backfill_upstream_tables,
580 snapshot_epoch,
581 hummock_version_stats,
582 partial_graph_manager,
583 &mut edges,
584 &resolved_split_assignment,
585 &actors,
586 )?;
587
588 if let Some(fragment_infos) = job.fragment_infos() {
589 self.database_info.shared_actor_infos.upsert(
590 self.database_id,
591 fragment_infos.values().map(|f| (f, job_id)),
592 );
593 }
594
595 for upstream_mv_table_id in snapshot_backfill_info
596 .upstream_mv_table_id_to_backfill_epoch
597 .keys()
598 {
599 self.database_info.register_subscriber(
600 upstream_mv_table_id.as_job_id(),
601 info.streaming_job.id().as_subscriber_id(),
602 SubscriberType::SnapshotBackfill,
603 );
604 }
605
606 let mutation = Command::create_streaming_job_to_mutation(
607 &info,
608 &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
609 self.state.is_paused(),
610 &mut edges,
611 partial_graph_manager.control_stream_manager(),
612 None,
613 &resolved_split_assignment,
614 &actors.stream_actors,
615 &actors.actor_location,
616 )?;
617
618 let (table_ids, node_actors) = self.collect_base_info();
619 (
620 Some(mutation),
621 table_ids,
622 None,
623 node_actors,
624 PostCollectCommand::barrier(),
625 )
626 }
627 }
628 Some(Command::CreateStreamingJob {
629 mut info,
630 job_type,
631 cross_db_snapshot_backfill_info,
632 }) => {
633 let ensembles = resolve_no_shuffle_ensembles(
634 &info.stream_job_fragments,
635 &info.upstream_fragment_downstreams,
636 )?;
637 let actors = render_actors(
638 &info.stream_job_fragments,
639 &self.database_info,
640 &info.definition,
641 &info.stream_job_fragments.inner.ctx,
642 &info.streaming_job_model,
643 partial_graph_manager
644 .control_stream_manager()
645 .env
646 .actor_id_generator(),
647 worker_nodes,
648 adaptive_parallelism_strategy,
649 &ensembles,
650 &info.database_resource_group,
651 )?;
652 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
653 fill_snapshot_backfill_epoch(
654 &mut fragment.nodes,
655 None,
656 &cross_db_snapshot_backfill_info,
657 )?;
658 }
659
660 let new_upstream_sink =
662 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
663 Some(ctx)
664 } else {
665 None
666 };
667
668 let mut edges = self.database_info.build_edge(
669 Some((&info, false)),
670 None,
671 new_upstream_sink,
672 partial_graph_manager.control_stream_manager(),
673 &actors.stream_actors,
674 &actors.actor_location,
675 );
676 let resolved_split_assignment = resolve_source_splits(
678 &info,
679 &actors,
680 edges.actor_new_no_shuffle(),
681 &self.database_info,
682 )?;
683
684 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
686 let (fragment, _) =
687 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
688 .expect("should have parallel cdc fragment");
689 Some(CdcTableBackfillTracker::new(
690 fragment.fragment_id,
691 splits.clone(),
692 ))
693 } else {
694 None
695 };
696 self.database_info
697 .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
698 self.database_info.pre_apply_new_fragments(
699 info.stream_job_fragments
700 .new_fragment_info(
701 &actors.stream_actors,
702 &actors.actor_location,
703 &resolved_split_assignment,
704 )
705 .map(|(fragment_id, fragment_infos)| {
706 (fragment_id, info.streaming_job.id(), fragment_infos)
707 }),
708 );
709 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
710 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
711 self.database_info.pre_apply_add_node_upstream(
712 downstream_fragment_id,
713 &PbUpstreamSinkInfo {
714 upstream_fragment_id: ctx.sink_fragment_id,
715 sink_output_schema: ctx.sink_output_fields.clone(),
716 project_exprs: ctx.project_exprs.clone(),
717 },
718 );
719 }
720
721 let (table_ids, node_actors) = self.collect_base_info();
722
723 let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
725 &info,
726 &mut edges,
727 &actors.stream_actors,
728 &actors.actor_location,
729 ));
730
731 let actor_cdc_table_snapshot_splits = self
733 .database_info
734 .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
735
736 let is_currently_paused = self.state.is_paused();
738 let mutation = Command::create_streaming_job_to_mutation(
739 &info,
740 &job_type,
741 is_currently_paused,
742 &mut edges,
743 partial_graph_manager.control_stream_manager(),
744 actor_cdc_table_snapshot_splits,
745 &resolved_split_assignment,
746 &actors.stream_actors,
747 &actors.actor_location,
748 )?;
749
750 (
751 Some(mutation),
752 table_ids,
753 actors_to_create,
754 node_actors,
755 PostCollectCommand::CreateStreamingJob {
756 info,
757 job_type,
758 cross_db_snapshot_backfill_info,
759 resolved_split_assignment,
760 },
761 )
762 }
763
764 Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
765
766 Some(Command::Pause) => {
767 let prev_is_paused = self.state.is_paused();
768 self.state.set_is_paused(true);
769 let mutation = Command::pause_to_mutation(prev_is_paused);
770 let (table_ids, node_actors) = self.collect_base_info();
771 (
772 mutation,
773 table_ids,
774 None,
775 node_actors,
776 PostCollectCommand::Command("Pause".to_owned()),
777 )
778 }
779
780 Some(Command::Resume) => {
781 let prev_is_paused = self.state.is_paused();
782 self.state.set_is_paused(false);
783 let mutation = Command::resume_to_mutation(prev_is_paused);
784 let (table_ids, node_actors) = self.collect_base_info();
785 (
786 mutation,
787 table_ids,
788 None,
789 node_actors,
790 PostCollectCommand::Command("Resume".to_owned()),
791 )
792 }
793
794 Some(Command::Throttle { jobs, config }) => {
795 let mutation = Some(Command::throttle_to_mutation(&config));
796 throttle_for_creating_jobs = Some((jobs, config));
797 self.apply_simple_command(mutation, "Throttle")
798 }
799
800 Some(Command::DropStreamingJobs {
801 streaming_job_ids,
802 unregistered_state_table_ids,
803 unregistered_fragment_ids,
804 dropped_sink_fragment_by_targets,
805 }) => {
806 let actors = self
807 .database_info
808 .fragment_infos()
809 .filter(|fragment| {
810 self.database_info
811 .job_id_by_fragment(fragment.fragment_id)
812 .is_some_and(|job_id| streaming_job_ids.contains(&job_id))
813 })
814 .flat_map(|fragment| fragment.actors.keys().copied())
815 .collect::<Vec<_>>();
816
817 for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
819 self.database_info
820 .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
821 }
822
823 let (table_ids, node_actors) = self.collect_base_info();
824
825 self.database_info
827 .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
828
829 let mutation = Some(Command::drop_streaming_jobs_to_mutation(
830 &actors,
831 &dropped_sink_fragment_by_targets,
832 ));
833 (
834 mutation,
835 table_ids,
836 None,
837 node_actors,
838 PostCollectCommand::DropStreamingJobs {
839 streaming_job_ids,
840 unregistered_state_table_ids,
841 },
842 )
843 }
844
845 Some(Command::RescheduleIntent {
846 reschedule_plan, ..
847 }) => {
848 let ReschedulePlan {
849 reschedules,
850 fragment_actors,
851 } = reschedule_plan
852 .as_ref()
853 .expect("reschedule intent should be resolved in global barrier worker");
854
855 for (fragment_id, reschedule) in reschedules {
857 self.database_info.pre_apply_reschedule(
858 *fragment_id,
859 reschedule
860 .added_actors
861 .iter()
862 .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
863 actors.iter().map(|actor_id| {
864 (
865 *actor_id,
866 InflightActorInfo {
867 worker_id: *node_id,
868 vnode_bitmap: reschedule
869 .newly_created_actors
870 .get(actor_id)
871 .expect("should exist")
872 .0
873 .0
874 .vnode_bitmap
875 .clone(),
876 splits: reschedule
877 .actor_splits
878 .get(actor_id)
879 .cloned()
880 .unwrap_or_default(),
881 },
882 )
883 })
884 })
885 .collect(),
886 reschedule
887 .vnode_bitmap_updates
888 .iter()
889 .filter(|(actor_id, _)| {
890 !reschedule.newly_created_actors.contains_key(*actor_id)
891 })
892 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
893 .collect(),
894 reschedule.actor_splits.clone(),
895 );
896 }
897
898 let (table_ids, node_actors) = self.collect_base_info();
899
900 let actors_to_create = Some(Command::reschedule_actors_to_create(
902 reschedules,
903 fragment_actors,
904 &self.database_info,
905 partial_graph_manager.control_stream_manager(),
906 ));
907
908 self.database_info
910 .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
911 (
912 *fragment_id,
913 reschedule.removed_actors.iter().cloned().collect(),
914 )
915 }));
916
917 let mutation = Command::reschedule_to_mutation(
919 reschedules,
920 fragment_actors,
921 partial_graph_manager.control_stream_manager(),
922 &mut self.database_info,
923 )?;
924
925 let reschedules = reschedule_plan
926 .expect("reschedule intent should be resolved in global barrier worker")
927 .reschedules;
928 (
929 mutation,
930 table_ids,
931 actors_to_create,
932 node_actors,
933 PostCollectCommand::Reschedule { reschedules },
934 )
935 }
936
937 Some(Command::ReplaceStreamJob(plan)) => {
938 let ensembles = resolve_no_shuffle_ensembles(
939 &plan.new_fragments,
940 &plan.upstream_fragment_downstreams,
941 )?;
942 let mut render_result = render_actors(
943 &plan.new_fragments,
944 &self.database_info,
945 "", &plan.new_fragments.inner.ctx,
947 &plan.streaming_job_model,
948 partial_graph_manager
949 .control_stream_manager()
950 .env
951 .actor_id_generator(),
952 worker_nodes,
953 adaptive_parallelism_strategy,
954 &ensembles,
955 &plan.database_resource_group,
956 )?;
957
958 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
961 let actor_id_counter = partial_graph_manager
962 .control_stream_manager()
963 .env
964 .actor_id_generator();
965 for sink_ctx in sinks {
966 let original_fragment_id = sink_ctx.original_fragment.fragment_id;
967 let original_frag_info = self.database_info.fragment(original_fragment_id);
968 let actor_template = EnsembleActorTemplate::from_existing_inflight_fragment(
969 original_frag_info,
970 );
971 let new_aligner = ComponentFragmentAligner::new_persistent(
972 &actor_template,
973 actor_id_counter,
974 );
975 let distribution_type: DistributionType =
976 sink_ctx.new_fragment.distribution_type.into();
977 let actor_assignments =
978 new_aligner.align_component_actor(distribution_type);
979 let new_fragment_id = sink_ctx.new_fragment.fragment_id;
980 let mut actors = Vec::with_capacity(actor_assignments.len());
981 for (&actor_id, (worker_id, vnode_bitmap)) in &actor_assignments {
982 render_result.actor_location.insert(actor_id, *worker_id);
983 actors.push(StreamActor {
984 actor_id,
985 fragment_id: new_fragment_id,
986 vnode_bitmap: vnode_bitmap.clone(),
987 mview_definition: String::new(),
988 expr_context: Some(sink_ctx.ctx.to_expr_context()),
989 config_override: sink_ctx.ctx.config_override.clone(),
990 });
991 }
992 render_result.stream_actors.insert(new_fragment_id, actors);
993 }
994 }
995
996 let mut edges = self.database_info.build_edge(
998 None,
999 Some(&plan),
1000 None,
1001 partial_graph_manager.control_stream_manager(),
1002 &render_result.stream_actors,
1003 &render_result.actor_location,
1004 );
1005
1006 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
1008 .stream_actors
1009 .iter()
1010 .map(|(fragment_id, actors)| {
1011 (
1012 *fragment_id,
1013 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
1014 )
1015 })
1016 .collect();
1017 let resolved_split_assignment = match &plan.split_plan {
1018 ReplaceJobSplitPlan::Discovered(discovered) => {
1019 SourceManager::resolve_fragment_to_actor_splits(
1020 &plan.new_fragments,
1021 discovered,
1022 &fragment_actor_ids,
1023 )?
1024 }
1025 ReplaceJobSplitPlan::AlignFromPrevious => {
1026 SourceManager::resolve_replace_source_splits(
1027 &plan.new_fragments,
1028 &plan.replace_upstream,
1029 edges.actor_new_no_shuffle(),
1030 |_fragment_id, actor_id| {
1031 self.database_info.fragment_infos().find_map(|fragment| {
1032 fragment
1033 .actors
1034 .get(&actor_id)
1035 .map(|info| info.splits.clone())
1036 })
1037 },
1038 )?
1039 }
1040 };
1041
1042 self.database_info.pre_apply_new_fragments(
1044 plan.new_fragments
1045 .new_fragment_info(
1046 &render_result.stream_actors,
1047 &render_result.actor_location,
1048 &resolved_split_assignment,
1049 )
1050 .map(|(fragment_id, new_fragment)| {
1051 (fragment_id, plan.streaming_job.id(), new_fragment)
1052 }),
1053 );
1054 for (fragment_id, replace_map) in &plan.replace_upstream {
1055 self.database_info
1056 .pre_apply_replace_node_upstream(*fragment_id, replace_map);
1057 }
1058 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1059 self.database_info
1060 .pre_apply_new_fragments(sinks.iter().map(|sink| {
1061 (
1062 sink.new_fragment.fragment_id,
1063 sink.original_sink.id.as_job_id(),
1064 sink.new_fragment_info(
1065 &render_result.stream_actors,
1066 &render_result.actor_location,
1067 ),
1068 )
1069 }));
1070 }
1071
1072 let (table_ids, node_actors) = self.collect_base_info();
1073
1074 let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
1076 &plan,
1077 &mut edges,
1078 &self.database_info,
1079 &render_result.stream_actors,
1080 &render_result.actor_location,
1081 ));
1082
1083 let mutation = Command::replace_stream_job_to_mutation(
1086 &plan,
1087 &mut edges,
1088 &mut self.database_info,
1089 &resolved_split_assignment,
1090 )?;
1091
1092 {
1094 let mut fragment_ids_to_remove: Vec<_> = plan
1095 .old_fragments
1096 .fragments
1097 .values()
1098 .map(|f| f.fragment_id)
1099 .collect();
1100 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1101 fragment_ids_to_remove
1102 .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
1103 }
1104 self.database_info
1105 .post_apply_remove_fragments(fragment_ids_to_remove);
1106 }
1107
1108 (
1109 mutation,
1110 table_ids,
1111 actors_to_create,
1112 node_actors,
1113 PostCollectCommand::ReplaceStreamJob {
1114 plan,
1115 resolved_split_assignment,
1116 },
1117 )
1118 }
1119
1120 Some(Command::SourceChangeSplit(split_state)) => {
1121 self.database_info.pre_apply_split_assignments(
1123 split_state
1124 .split_assignment
1125 .iter()
1126 .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
1127 );
1128
1129 let mutation = Some(Command::source_change_split_to_mutation(
1130 &split_state.split_assignment,
1131 ));
1132 let (table_ids, node_actors) = self.collect_base_info();
1133 (
1134 mutation,
1135 table_ids,
1136 None,
1137 node_actors,
1138 PostCollectCommand::SourceChangeSplit {
1139 split_assignment: split_state.split_assignment,
1140 },
1141 )
1142 }
1143
1144 Some(Command::CreateSubscription {
1145 subscription_id,
1146 upstream_mv_table_id,
1147 retention_second,
1148 }) => {
1149 self.database_info.register_subscriber(
1150 upstream_mv_table_id.as_job_id(),
1151 subscription_id.as_subscriber_id(),
1152 SubscriberType::Subscription(retention_second),
1153 );
1154 let mutation = Some(Command::create_subscription_to_mutation(
1155 upstream_mv_table_id,
1156 subscription_id,
1157 ));
1158 let (table_ids, node_actors) = self.collect_base_info();
1159 (
1160 mutation,
1161 table_ids,
1162 None,
1163 node_actors,
1164 PostCollectCommand::CreateSubscription { subscription_id },
1165 )
1166 }
1167
1168 Some(Command::DropSubscription {
1169 subscription_id,
1170 upstream_mv_table_id,
1171 }) => {
1172 if self
1173 .database_info
1174 .unregister_subscriber(
1175 upstream_mv_table_id.as_job_id(),
1176 subscription_id.as_subscriber_id(),
1177 )
1178 .is_none()
1179 {
1180 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
1181 }
1182 let mutation = Some(Command::drop_subscription_to_mutation(
1183 upstream_mv_table_id,
1184 subscription_id,
1185 ));
1186 let (table_ids, node_actors) = self.collect_base_info();
1187 (
1188 mutation,
1189 table_ids,
1190 None,
1191 node_actors,
1192 PostCollectCommand::Command("DropSubscription".to_owned()),
1193 )
1194 }
1195
1196 Some(Command::AlterSubscriptionRetention {
1197 subscription_id,
1198 upstream_mv_table_id,
1199 retention_second,
1200 }) => {
1201 self.database_info.update_subscription_retention(
1202 upstream_mv_table_id.as_job_id(),
1203 subscription_id.as_subscriber_id(),
1204 retention_second,
1205 );
1206 self.apply_simple_command(None, "AlterSubscriptionRetention")
1207 }
1208
1209 Some(Command::ConnectorPropsChange(config)) => {
1210 let mutation = Some(Command::connector_props_change_to_mutation(&config));
1211 let (table_ids, node_actors) = self.collect_base_info();
1212 (
1213 mutation,
1214 table_ids,
1215 None,
1216 node_actors,
1217 PostCollectCommand::ConnectorPropsChange(config),
1218 )
1219 }
1220
1221 Some(Command::Refresh {
1222 table_id,
1223 associated_source_id,
1224 }) => {
1225 let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
1226 self.apply_simple_command(mutation, "Refresh")
1227 }
1228
1229 Some(Command::ListFinish {
1230 table_id: _,
1231 associated_source_id,
1232 }) => {
1233 let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
1234 self.apply_simple_command(mutation, "ListFinish")
1235 }
1236
1237 Some(Command::LoadFinish {
1238 table_id: _,
1239 associated_source_id,
1240 }) => {
1241 let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
1242 self.apply_simple_command(mutation, "LoadFinish")
1243 }
1244
1245 Some(Command::ResetSource { source_id }) => {
1246 let mutation = Some(Command::reset_source_to_mutation(source_id));
1247 self.apply_simple_command(mutation, "ResetSource")
1248 }
1249
1250 Some(Command::ResumeBackfill { target }) => {
1251 let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
1252 let (table_ids, node_actors) = self.collect_base_info();
1253 (
1254 mutation,
1255 table_ids,
1256 None,
1257 node_actors,
1258 PostCollectCommand::ResumeBackfill { target },
1259 )
1260 }
1261
1262 Some(Command::InjectSourceOffsets {
1263 source_id,
1264 split_offsets,
1265 }) => {
1266 let mutation = Some(Command::inject_source_offsets_to_mutation(
1267 source_id,
1268 &split_offsets,
1269 ));
1270 self.apply_simple_command(mutation, "InjectSourceOffsets")
1271 }
1272 };
1273
1274 let mut finished_snapshot_backfill_jobs = HashSet::new();
1275 let mutation = match mutation {
1276 Some(mutation) => Some(mutation),
1277 None => {
1278 let mut finished_snapshot_backfill_job_info = HashMap::new();
1279 if barrier_info.kind.is_checkpoint() {
1280 for (&job_id, job) in &mut self.independent_checkpoint_job_controls {
1281 let IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) =
1282 job;
1283 if creating_job.should_merge_to_upstream() {
1284 let info = creating_job
1285 .start_consume_upstream(partial_graph_manager, &barrier_info)?;
1286 finished_snapshot_backfill_job_info
1287 .try_insert(job_id, info)
1288 .expect("non-duplicated");
1289 }
1290 }
1291 }
1292
1293 if !finished_snapshot_backfill_job_info.is_empty() {
1294 let actors_to_create = actors_to_create.get_or_insert_default();
1295 let mut subscriptions_to_drop = vec![];
1296 let mut dispatcher_update = vec![];
1297 let mut actor_splits = HashMap::new();
1298 for (job_id, info) in finished_snapshot_backfill_job_info {
1299 finished_snapshot_backfill_jobs.insert(job_id);
1300 subscriptions_to_drop.extend(
1301 info.snapshot_backfill_upstream_tables.iter().map(
1302 |upstream_table_id| PbSubscriptionUpstreamInfo {
1303 subscriber_id: job_id.as_subscriber_id(),
1304 upstream_mv_table_id: *upstream_table_id,
1305 },
1306 ),
1307 );
1308 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
1309 assert_matches!(
1310 self.database_info.unregister_subscriber(
1311 upstream_mv_table_id.as_job_id(),
1312 job_id.as_subscriber_id()
1313 ),
1314 Some(SubscriberType::SnapshotBackfill)
1315 );
1316 }
1317
1318 table_ids_to_commit.extend(
1319 info.fragment_infos
1320 .values()
1321 .flat_map(|fragment| fragment.state_table_ids.iter())
1322 .copied(),
1323 );
1324
1325 let actor_len = info
1326 .fragment_infos
1327 .values()
1328 .map(|fragment| fragment.actors.len() as u64)
1329 .sum();
1330 let id_gen = GlobalActorIdGen::new(
1331 partial_graph_manager
1332 .control_stream_manager()
1333 .env
1334 .actor_id_generator(),
1335 actor_len,
1336 );
1337 let mut next_local_actor_id = 0;
1338 let actor_mapping: HashMap<_, _> = info
1340 .fragment_infos
1341 .values()
1342 .flat_map(|fragment| fragment.actors.keys())
1343 .map(|old_actor_id| {
1344 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
1345 next_local_actor_id += 1;
1346 (*old_actor_id, new_actor_id.as_global_id())
1347 })
1348 .collect();
1349 let actor_mapping = &actor_mapping;
1350 let new_stream_actors: HashMap<_, _> = info
1351 .stream_actors
1352 .into_iter()
1353 .map(|(old_actor_id, mut actor)| {
1354 let new_actor_id = actor_mapping[&old_actor_id];
1355 actor.actor_id = new_actor_id;
1356 (new_actor_id, actor)
1357 })
1358 .collect();
1359 let new_fragment_info: HashMap<_, _> = info
1360 .fragment_infos
1361 .into_iter()
1362 .map(|(fragment_id, mut fragment)| {
1363 let actors = take(&mut fragment.actors);
1364 fragment.actors = actors
1365 .into_iter()
1366 .map(|(old_actor_id, actor)| {
1367 let new_actor_id = actor_mapping[&old_actor_id];
1368 (new_actor_id, actor)
1369 })
1370 .collect();
1371 (fragment_id, fragment)
1372 })
1373 .collect();
1374 actor_splits.extend(
1375 new_fragment_info
1376 .values()
1377 .flat_map(|fragment| &fragment.actors)
1378 .map(|(actor_id, actor)| {
1379 (
1380 *actor_id,
1381 ConnectorSplits {
1382 splits: actor
1383 .splits
1384 .iter()
1385 .map(ConnectorSplit::from)
1386 .collect(),
1387 },
1388 )
1389 }),
1390 );
1391 let partial_graph_id = to_partial_graph_id(self.database_id, None);
1393 let mut edge_builder = FragmentEdgeBuilder::new(
1394 info.upstream_fragment_downstreams
1395 .keys()
1396 .map(|upstream_fragment_id| {
1397 self.database_info.fragment(*upstream_fragment_id)
1398 })
1399 .chain(new_fragment_info.values())
1400 .map(|fragment| {
1401 (
1402 fragment.fragment_id,
1403 EdgeBuilderFragmentInfo::from_inflight(
1404 fragment,
1405 partial_graph_id,
1406 partial_graph_manager.control_stream_manager(),
1407 ),
1408 )
1409 }),
1410 );
1411 edge_builder.add_relations(&info.upstream_fragment_downstreams);
1412 edge_builder.add_relations(&info.downstreams);
1413 let mut edges = edge_builder.build();
1414 let new_actors_to_create = edges.collect_actors_to_create(
1415 new_fragment_info.values().map(|fragment| {
1416 (
1417 fragment.fragment_id,
1418 &fragment.nodes,
1419 fragment.actors.iter().map(|(actor_id, actor)| {
1420 (&new_stream_actors[actor_id], actor.worker_id)
1421 }),
1422 [], )
1424 }),
1425 );
1426 dispatcher_update.extend(
1427 info.upstream_fragment_downstreams.keys().flat_map(
1428 |upstream_fragment_id| {
1429 let new_actor_dispatchers = edges
1430 .dispatchers
1431 .remove(upstream_fragment_id)
1432 .expect("should exist");
1433 new_actor_dispatchers.into_iter().flat_map(
1434 |(upstream_actor_id, dispatchers)| {
1435 dispatchers.into_iter().map(move |dispatcher| {
1436 PbDispatcherUpdate {
1437 actor_id: upstream_actor_id,
1438 dispatcher_id: dispatcher.dispatcher_id,
1439 hash_mapping: dispatcher.hash_mapping,
1440 removed_downstream_actor_id: dispatcher
1441 .downstream_actor_id
1442 .iter()
1443 .map(|new_downstream_actor_id| {
1444 actor_mapping
1445 .iter()
1446 .find_map(
1447 |(old_actor_id, new_actor_id)| {
1448 (new_downstream_actor_id
1449 == new_actor_id)
1450 .then_some(*old_actor_id)
1451 },
1452 )
1453 .expect("should exist")
1454 })
1455 .collect(),
1456 added_downstream_actor_id: dispatcher
1457 .downstream_actor_id,
1458 }
1459 })
1460 },
1461 )
1462 },
1463 ),
1464 );
1465 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1466 for (worker_id, worker_actors) in new_actors_to_create {
1467 node_actors.entry(worker_id).or_default().extend(
1468 worker_actors.values().flat_map(|(_, actors, _)| {
1469 actors.iter().map(|(actor, _, _)| actor.actor_id)
1470 }),
1471 );
1472 actors_to_create
1473 .entry(worker_id)
1474 .or_default()
1475 .extend(worker_actors);
1476 }
1477 self.database_info.add_existing(InflightStreamingJobInfo {
1478 job_id,
1479 fragment_infos: new_fragment_info,
1480 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
1482 cdc_table_backfill_tracker: None, });
1484 }
1485
1486 Some(PbMutation::Update(PbUpdateMutation {
1487 dispatcher_update,
1488 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
1492 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
1496 }))
1497 } else {
1498 let fragment_ids = self.database_info.take_pending_backfill_nodes();
1499 if fragment_ids.is_empty() {
1500 None
1501 } else {
1502 Some(PbMutation::StartFragmentBackfill(
1503 PbStartFragmentBackfillMutation { fragment_ids },
1504 ))
1505 }
1506 }
1507 }
1508 };
1509
1510 for (job_id, job) in &mut self.independent_checkpoint_job_controls {
1512 let IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) = job;
1513 if !finished_snapshot_backfill_jobs.contains(job_id) {
1514 let throttle_mutation = if let Some((ref jobs, ref config)) =
1515 throttle_for_creating_jobs
1516 && jobs.contains(job_id)
1517 {
1518 assert_eq!(
1519 jobs.len(),
1520 1,
1521 "should not alter rate limit of snapshot backfill job with other jobs"
1522 );
1523 Some((
1524 Mutation::Throttle(ThrottleMutation {
1525 fragment_throttle: config
1526 .iter()
1527 .map(|(fragment_id, config)| (*fragment_id, *config))
1528 .collect(),
1529 }),
1530 take(notifiers),
1531 ))
1532 } else {
1533 None
1534 };
1535 creating_job.on_new_upstream_barrier(
1536 partial_graph_manager,
1537 &barrier_info,
1538 throttle_mutation,
1539 )?;
1540 }
1541 }
1542
1543 partial_graph_manager.inject_barrier(
1544 to_partial_graph_id(self.database_id, None),
1545 mutation,
1546 &node_actors,
1547 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1548 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1549 actors_to_create,
1550 PartialGraphBarrierInfo::new(
1551 post_collect_command,
1552 barrier_info,
1553 take(notifiers),
1554 table_ids_to_commit,
1555 ),
1556 )?;
1557
1558 Ok(ApplyCommandInfo {
1559 jobs_to_wait: finished_snapshot_backfill_jobs,
1560 })
1561 }
1562}