1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::sync::atomic::AtomicU32;
20
21use risingwave_common::bail;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::TableId;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
33use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
34use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
35use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
36use risingwave_pb::stream_plan::{
37 PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
38 PbUpstreamSinkInfo, ThrottleMutation,
39};
40use tracing::warn;
41
42use crate::MetaResult;
43use crate::barrier::cdc_progress::CdcTableBackfillTracker;
44use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
45use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
46use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
47use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
48use crate::barrier::info::{
49 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
50 SubscriberType,
51};
52use crate::barrier::notifier::Notifier;
53use crate::barrier::partial_graph::PartialGraphManager;
54use crate::barrier::rpc::to_partial_graph_id;
55use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
56use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
57use crate::controller::scale::{
58 ComponentFragmentAligner, EnsembleActorTemplate, NoShuffleEnsemble,
59 build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
60};
61use crate::model::{
62 ActorId, ActorNewNoShuffle, FragmentDownstreamRelation, FragmentId, StreamActor, StreamContext,
63 StreamJobActorsToCreate, StreamJobFragmentsToCreate,
64};
65use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
66use crate::stream::{
67 GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
68 fill_snapshot_backfill_epoch,
69};
70
71pub(in crate::barrier) struct BarrierWorkerState {
73 in_flight_prev_epoch: TracedEpoch,
78
79 pending_non_checkpoint_barriers: Vec<u64>,
81
82 is_paused: bool,
84}
85
86impl BarrierWorkerState {
87 pub(super) fn new() -> Self {
88 Self {
89 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
90 pending_non_checkpoint_barriers: vec![],
91 is_paused: false,
92 }
93 }
94
95 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
96 Self {
97 in_flight_prev_epoch,
98 pending_non_checkpoint_barriers: vec![],
99 is_paused,
100 }
101 }
102
103 pub fn is_paused(&self) -> bool {
104 self.is_paused
105 }
106
107 fn set_is_paused(&mut self, is_paused: bool) {
108 if self.is_paused != is_paused {
109 tracing::info!(
110 currently_paused = self.is_paused,
111 newly_paused = is_paused,
112 "update paused state"
113 );
114 self.is_paused = is_paused;
115 }
116 }
117
118 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
119 &self.in_flight_prev_epoch
120 }
121
122 pub fn next_barrier_info(
124 &mut self,
125 is_checkpoint: bool,
126 curr_epoch: TracedEpoch,
127 ) -> BarrierInfo {
128 assert!(
129 self.in_flight_prev_epoch.value() < curr_epoch.value(),
130 "curr epoch regress. {} > {}",
131 self.in_flight_prev_epoch.value(),
132 curr_epoch.value()
133 );
134 let prev_epoch = self.in_flight_prev_epoch.clone();
135 self.in_flight_prev_epoch = curr_epoch.clone();
136 self.pending_non_checkpoint_barriers
137 .push(prev_epoch.value().0);
138 let kind = if is_checkpoint {
139 let epochs = take(&mut self.pending_non_checkpoint_barriers);
140 BarrierKind::Checkpoint(epochs)
141 } else {
142 BarrierKind::Barrier
143 };
144 BarrierInfo {
145 prev_epoch,
146 curr_epoch,
147 kind,
148 }
149 }
150}
151
152pub(super) struct ApplyCommandInfo {
153 pub mv_subscription_max_retention: HashMap<TableId, u64>,
154 pub table_ids_to_commit: HashSet<TableId>,
155 pub jobs_to_wait: HashSet<JobId>,
156 pub command: PostCollectCommand,
157}
158
159type ApplyCommandResult = (
162 Option<Mutation>,
163 HashSet<TableId>,
164 Option<StreamJobActorsToCreate>,
165 HashMap<WorkerId, HashSet<ActorId>>,
166 PostCollectCommand,
167);
168
169pub(super) struct RenderResult {
171 pub stream_actors: HashMap<FragmentId, Vec<StreamActor>>,
173 pub actor_location: HashMap<ActorId, WorkerId>,
175}
176
177fn resolve_no_shuffle_ensembles(
186 fragments: &StreamJobFragmentsToCreate,
187 upstream_fragment_downstreams: &FragmentDownstreamRelation,
188) -> MetaResult<Vec<NoShuffleEnsemble>> {
189 let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
191
192 for (upstream_fid, relations) in &fragments.downstreams {
194 for rel in relations {
195 if rel.dispatcher_type == DispatcherType::NoShuffle {
196 new_no_shuffle
197 .entry(*upstream_fid)
198 .or_default()
199 .insert(rel.downstream_fragment_id);
200 }
201 }
202 }
203
204 for (upstream_fid, relations) in upstream_fragment_downstreams {
206 for rel in relations {
207 if rel.dispatcher_type == DispatcherType::NoShuffle {
208 new_no_shuffle
209 .entry(*upstream_fid)
210 .or_default()
211 .insert(rel.downstream_fragment_id);
212 }
213 }
214 }
215
216 let mut ensembles = if new_no_shuffle.is_empty() {
217 Vec::new()
218 } else {
219 let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
221 .iter()
222 .flat_map(|(upstream_fid, downstream_fids)| {
223 downstream_fids
224 .iter()
225 .map(move |downstream_fid| (*upstream_fid, *downstream_fid))
226 })
227 .collect();
228
229 let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
230 .iter()
231 .flat_map(|(u, d)| [*u, *d])
232 .collect::<HashSet<_>>()
233 .into_iter()
234 .collect();
235
236 let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
237 find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
238 };
239
240 let covered: HashSet<FragmentId> = ensembles
242 .iter()
243 .flat_map(|e| e.component_fragments())
244 .collect();
245 for fragment_id in fragments.inner.fragments.keys() {
246 if !covered.contains(fragment_id) {
247 ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
248 }
249 }
250
251 Ok(ensembles)
252}
253
254fn render_actors(
265 fragments: &StreamJobFragmentsToCreate,
266 database_info: &InflightDatabaseInfo,
267 definition: &str,
268 ctx: &StreamContext,
269 streaming_job_model: &streaming_job::Model,
270 actor_id_counter: &AtomicU32,
271 worker_map: &HashMap<WorkerId, WorkerNode>,
272 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
273 ensembles: &[NoShuffleEnsemble],
274 database_resource_group: &str,
275) -> MetaResult<RenderResult> {
276 let mut actor_assignments: HashMap<FragmentId, HashMap<ActorId, (WorkerId, Option<Bitmap>)>> =
279 HashMap::new();
280
281 for ensemble in ensembles {
282 let existing_fragment_ids: Vec<FragmentId> = ensemble
287 .component_fragments()
288 .filter(|fragment_id| !fragments.inner.fragments.contains_key(fragment_id))
289 .collect();
290
291 let actor_template = if let Some(&first_existing) = existing_fragment_ids.first() {
292 let template = EnsembleActorTemplate::from_existing_inflight_fragment(
293 database_info.fragment(first_existing),
294 );
295
296 for &other_fragment_id in &existing_fragment_ids[1..] {
299 let other = EnsembleActorTemplate::from_existing_inflight_fragment(
300 database_info.fragment(other_fragment_id),
301 );
302 template.assert_aligned_with(&other, first_existing, other_fragment_id);
303 }
304
305 template
306 } else {
307 let first_component = ensemble
309 .component_fragments()
310 .next()
311 .expect("ensemble must have at least one component");
312 let fragment = &fragments.inner.fragments[&first_component];
313 let distribution_type: DistributionType = fragment.distribution_type.into();
314 let vnode_count = fragment.vnode_count();
315
316 for fragment_id in ensemble.component_fragments() {
318 let f = &fragments.inner.fragments[&fragment_id];
319 assert_eq!(
320 vnode_count,
321 f.vnode_count(),
322 "component fragments {} and {} in the same no-shuffle ensemble have \
323 different vnode counts: {} vs {}",
324 first_component,
325 fragment_id,
326 vnode_count,
327 f.vnode_count(),
328 );
329 }
330
331 EnsembleActorTemplate::render_new(
332 streaming_job_model,
333 worker_map,
334 adaptive_parallelism_strategy,
335 None,
336 database_resource_group.to_owned(),
337 distribution_type,
338 vnode_count,
339 )?
340 };
341
342 for fragment_id in ensemble.component_fragments() {
344 if !fragments.inner.fragments.contains_key(&fragment_id) {
345 continue; }
347 let fragment = &fragments.inner.fragments[&fragment_id];
348 let distribution_type: DistributionType = fragment.distribution_type.into();
349 let aligner = ComponentFragmentAligner::new(&actor_template, actor_id_counter);
350 let assignments = aligner.align_component_actor(distribution_type);
351 actor_assignments.insert(fragment_id, assignments);
352 }
353 }
354
355 let mut result_stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
357 let mut result_actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
358
359 for (fragment_id, assignments) in &actor_assignments {
360 let mut actors = Vec::with_capacity(assignments.len());
361 for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
362 result_actor_location.insert(actor_id, *worker_id);
363 actors.push(StreamActor {
364 actor_id,
365 fragment_id: *fragment_id,
366 vnode_bitmap: vnode_bitmap.clone(),
367 mview_definition: definition.to_owned(),
368 expr_context: Some(ctx.to_expr_context()),
369 config_override: ctx.config_override.clone(),
370 });
371 }
372 result_stream_actors.insert(*fragment_id, actors);
373 }
374
375 Ok(RenderResult {
376 stream_actors: result_stream_actors,
377 actor_location: result_actor_location,
378 })
379}
380impl DatabaseCheckpointControl {
381 fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
383 let table_ids_to_commit = self.database_info.existing_table_ids().collect();
384 let node_actors =
385 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
386 (table_ids_to_commit, node_actors)
387 }
388
389 fn apply_simple_command(
393 &self,
394 mutation: Option<Mutation>,
395 command_name: &'static str,
396 ) -> ApplyCommandResult {
397 let (table_ids, node_actors) = self.collect_base_info();
398 (
399 mutation,
400 table_ids,
401 None,
402 node_actors,
403 PostCollectCommand::Command(command_name.to_owned()),
404 )
405 }
406
407 pub(super) fn apply_command(
410 &mut self,
411 command: Option<Command>,
412 notifiers: &mut Vec<Notifier>,
413 barrier_info: &BarrierInfo,
414 partial_graph_manager: &mut PartialGraphManager,
415 hummock_version_stats: &HummockVersionStats,
416 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
417 worker_nodes: &HashMap<WorkerId, WorkerNode>,
418 ) -> MetaResult<ApplyCommandInfo> {
419 debug_assert!(
420 !matches!(
421 command,
422 Some(Command::RescheduleIntent {
423 reschedule_plan: None,
424 ..
425 })
426 ),
427 "reschedule intent must be resolved before apply"
428 );
429 if matches!(
430 command,
431 Some(Command::RescheduleIntent {
432 reschedule_plan: None,
433 ..
434 })
435 ) {
436 bail!("reschedule intent must be resolved before apply");
437 }
438
439 fn resolve_source_splits(
444 info: &CreateStreamingJobCommandInfo,
445 render_result: &RenderResult,
446 actor_no_shuffle: &ActorNewNoShuffle,
447 database_info: &InflightDatabaseInfo,
448 ) -> MetaResult<SplitAssignment> {
449 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
450 .stream_actors
451 .iter()
452 .map(|(fragment_id, actors)| {
453 (
454 *fragment_id,
455 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
456 )
457 })
458 .collect();
459 let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
460 &info.stream_job_fragments,
461 &info.init_split_assignment,
462 &fragment_actor_ids,
463 )?;
464 resolved.extend(SourceManager::resolve_backfill_splits(
465 &info.stream_job_fragments,
466 actor_no_shuffle,
467 |fragment_id, actor_id| {
468 database_info
469 .fragment(fragment_id)
470 .actors
471 .get(&actor_id)
472 .map(|info| info.splits.clone())
473 },
474 )?);
475 Ok(resolved)
476 }
477
478 let mut throttle_for_creating_jobs: Option<(
480 HashSet<JobId>,
481 HashMap<FragmentId, ThrottleConfig>,
482 )> = None;
483
484 let (
488 mutation,
489 mut table_ids_to_commit,
490 mut actors_to_create,
491 mut node_actors,
492 post_collect_command,
493 ) = match command {
494 None => self.apply_simple_command(None, "barrier"),
495 Some(Command::CreateStreamingJob {
496 mut info,
497 job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
498 cross_db_snapshot_backfill_info,
499 }) => {
500 let ensembles = resolve_no_shuffle_ensembles(
501 &info.stream_job_fragments,
502 &info.upstream_fragment_downstreams,
503 )?;
504 let actors = render_actors(
505 &info.stream_job_fragments,
506 &self.database_info,
507 &info.definition,
508 &info.stream_job_fragments.inner.ctx,
509 &info.streaming_job_model,
510 partial_graph_manager
511 .control_stream_manager()
512 .env
513 .actor_id_generator(),
514 worker_nodes,
515 adaptive_parallelism_strategy,
516 &ensembles,
517 &info.database_resource_group,
518 )?;
519 {
520 assert!(!self.state.is_paused());
521 let snapshot_epoch = barrier_info.prev_epoch();
522 for snapshot_backfill_epoch in snapshot_backfill_info
524 .upstream_mv_table_id_to_backfill_epoch
525 .values_mut()
526 {
527 assert_eq!(
528 snapshot_backfill_epoch.replace(snapshot_epoch),
529 None,
530 "must not set previously"
531 );
532 }
533 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
534 fill_snapshot_backfill_epoch(
535 &mut fragment.nodes,
536 Some(&snapshot_backfill_info),
537 &cross_db_snapshot_backfill_info,
538 )?;
539 }
540 let job_id = info.stream_job_fragments.stream_job_id();
541 let snapshot_backfill_upstream_tables = snapshot_backfill_info
542 .upstream_mv_table_id_to_backfill_epoch
543 .keys()
544 .cloned()
545 .collect();
546
547 let mut edges = self.database_info.build_edge(
549 Some((&info, true)),
550 None,
551 None,
552 partial_graph_manager.control_stream_manager(),
553 &actors.stream_actors,
554 &actors.actor_location,
555 );
556 let resolved_split_assignment = resolve_source_splits(
558 &info,
559 &actors,
560 edges.actor_new_no_shuffle(),
561 &self.database_info,
562 )?;
563
564 let Entry::Vacant(entry) = self.creating_streaming_job_controls.entry(job_id)
565 else {
566 panic!("duplicated creating snapshot backfill job {job_id}");
567 };
568
569 let job = CreatingStreamingJobControl::new(
570 entry,
571 CreateSnapshotBackfillJobCommandInfo {
572 info: info.clone(),
573 snapshot_backfill_info: snapshot_backfill_info.clone(),
574 cross_db_snapshot_backfill_info,
575 resolved_split_assignment: resolved_split_assignment.clone(),
576 },
577 take(notifiers),
578 snapshot_backfill_upstream_tables,
579 snapshot_epoch,
580 hummock_version_stats,
581 partial_graph_manager,
582 &mut edges,
583 &resolved_split_assignment,
584 &actors,
585 )?;
586
587 self.database_info
588 .shared_actor_infos
589 .upsert(self.database_id, job.fragment_infos_with_job_id());
590
591 for upstream_mv_table_id in snapshot_backfill_info
592 .upstream_mv_table_id_to_backfill_epoch
593 .keys()
594 {
595 self.database_info.register_subscriber(
596 upstream_mv_table_id.as_job_id(),
597 info.streaming_job.id().as_subscriber_id(),
598 SubscriberType::SnapshotBackfill,
599 );
600 }
601
602 let mutation = Command::create_streaming_job_to_mutation(
603 &info,
604 &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
605 self.state.is_paused(),
606 &mut edges,
607 partial_graph_manager.control_stream_manager(),
608 None,
609 &resolved_split_assignment,
610 &actors.stream_actors,
611 &actors.actor_location,
612 )?;
613
614 let (table_ids, node_actors) = self.collect_base_info();
615 (
616 Some(mutation),
617 table_ids,
618 None,
619 node_actors,
620 PostCollectCommand::barrier(),
621 )
622 }
623 }
624 Some(Command::CreateStreamingJob {
625 mut info,
626 job_type,
627 cross_db_snapshot_backfill_info,
628 }) => {
629 let ensembles = resolve_no_shuffle_ensembles(
630 &info.stream_job_fragments,
631 &info.upstream_fragment_downstreams,
632 )?;
633 let actors = render_actors(
634 &info.stream_job_fragments,
635 &self.database_info,
636 &info.definition,
637 &info.stream_job_fragments.inner.ctx,
638 &info.streaming_job_model,
639 partial_graph_manager
640 .control_stream_manager()
641 .env
642 .actor_id_generator(),
643 worker_nodes,
644 adaptive_parallelism_strategy,
645 &ensembles,
646 &info.database_resource_group,
647 )?;
648 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
649 fill_snapshot_backfill_epoch(
650 &mut fragment.nodes,
651 None,
652 &cross_db_snapshot_backfill_info,
653 )?;
654 }
655
656 let new_upstream_sink =
658 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
659 Some(ctx)
660 } else {
661 None
662 };
663
664 let mut edges = self.database_info.build_edge(
665 Some((&info, false)),
666 None,
667 new_upstream_sink,
668 partial_graph_manager.control_stream_manager(),
669 &actors.stream_actors,
670 &actors.actor_location,
671 );
672 let resolved_split_assignment = resolve_source_splits(
674 &info,
675 &actors,
676 edges.actor_new_no_shuffle(),
677 &self.database_info,
678 )?;
679
680 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
682 let (fragment, _) =
683 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
684 .expect("should have parallel cdc fragment");
685 Some(CdcTableBackfillTracker::new(
686 fragment.fragment_id,
687 splits.clone(),
688 ))
689 } else {
690 None
691 };
692 self.database_info
693 .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
694 self.database_info.pre_apply_new_fragments(
695 info.stream_job_fragments
696 .new_fragment_info(
697 &actors.stream_actors,
698 &actors.actor_location,
699 &resolved_split_assignment,
700 )
701 .map(|(fragment_id, fragment_infos)| {
702 (fragment_id, info.streaming_job.id(), fragment_infos)
703 }),
704 );
705 if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
706 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
707 self.database_info.pre_apply_add_node_upstream(
708 downstream_fragment_id,
709 &PbUpstreamSinkInfo {
710 upstream_fragment_id: ctx.sink_fragment_id,
711 sink_output_schema: ctx.sink_output_fields.clone(),
712 project_exprs: ctx.project_exprs.clone(),
713 },
714 );
715 }
716
717 let (table_ids, node_actors) = self.collect_base_info();
718
719 let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
721 &info,
722 &mut edges,
723 &actors.stream_actors,
724 &actors.actor_location,
725 ));
726
727 let actor_cdc_table_snapshot_splits = self
729 .database_info
730 .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
731
732 let is_currently_paused = self.state.is_paused();
734 let mutation = Command::create_streaming_job_to_mutation(
735 &info,
736 &job_type,
737 is_currently_paused,
738 &mut edges,
739 partial_graph_manager.control_stream_manager(),
740 actor_cdc_table_snapshot_splits,
741 &resolved_split_assignment,
742 &actors.stream_actors,
743 &actors.actor_location,
744 )?;
745
746 (
747 Some(mutation),
748 table_ids,
749 actors_to_create,
750 node_actors,
751 PostCollectCommand::CreateStreamingJob {
752 info,
753 job_type,
754 cross_db_snapshot_backfill_info,
755 resolved_split_assignment,
756 },
757 )
758 }
759
760 Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
761
762 Some(Command::Pause) => {
763 let prev_is_paused = self.state.is_paused();
764 self.state.set_is_paused(true);
765 let mutation = Command::pause_to_mutation(prev_is_paused);
766 let (table_ids, node_actors) = self.collect_base_info();
767 (
768 mutation,
769 table_ids,
770 None,
771 node_actors,
772 PostCollectCommand::Command("Pause".to_owned()),
773 )
774 }
775
776 Some(Command::Resume) => {
777 let prev_is_paused = self.state.is_paused();
778 self.state.set_is_paused(false);
779 let mutation = Command::resume_to_mutation(prev_is_paused);
780 let (table_ids, node_actors) = self.collect_base_info();
781 (
782 mutation,
783 table_ids,
784 None,
785 node_actors,
786 PostCollectCommand::Command("Resume".to_owned()),
787 )
788 }
789
790 Some(Command::Throttle { jobs, config }) => {
791 let mutation = Some(Command::throttle_to_mutation(&config));
792 throttle_for_creating_jobs = Some((jobs, config));
793 self.apply_simple_command(mutation, "Throttle")
794 }
795
796 Some(Command::DropStreamingJobs {
797 streaming_job_ids,
798 unregistered_state_table_ids,
799 unregistered_fragment_ids,
800 dropped_sink_fragment_by_targets,
801 }) => {
802 let actors = self
803 .database_info
804 .fragment_infos()
805 .filter(|fragment| {
806 self.database_info
807 .job_id_by_fragment(fragment.fragment_id)
808 .is_some_and(|job_id| streaming_job_ids.contains(&job_id))
809 })
810 .flat_map(|fragment| fragment.actors.keys().copied())
811 .collect::<Vec<_>>();
812
813 for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
815 self.database_info
816 .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
817 }
818
819 let (table_ids, node_actors) = self.collect_base_info();
820
821 self.database_info
823 .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
824
825 let mutation = Some(Command::drop_streaming_jobs_to_mutation(
826 &actors,
827 &dropped_sink_fragment_by_targets,
828 ));
829 (
830 mutation,
831 table_ids,
832 None,
833 node_actors,
834 PostCollectCommand::DropStreamingJobs {
835 streaming_job_ids,
836 unregistered_state_table_ids,
837 },
838 )
839 }
840
841 Some(Command::RescheduleIntent {
842 reschedule_plan, ..
843 }) => {
844 let ReschedulePlan {
845 reschedules,
846 fragment_actors,
847 } = reschedule_plan
848 .as_ref()
849 .expect("reschedule intent should be resolved in global barrier worker");
850
851 for (fragment_id, reschedule) in reschedules {
853 self.database_info.pre_apply_reschedule(
854 *fragment_id,
855 reschedule
856 .added_actors
857 .iter()
858 .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
859 actors.iter().map(|actor_id| {
860 (
861 *actor_id,
862 InflightActorInfo {
863 worker_id: *node_id,
864 vnode_bitmap: reschedule
865 .newly_created_actors
866 .get(actor_id)
867 .expect("should exist")
868 .0
869 .0
870 .vnode_bitmap
871 .clone(),
872 splits: reschedule
873 .actor_splits
874 .get(actor_id)
875 .cloned()
876 .unwrap_or_default(),
877 },
878 )
879 })
880 })
881 .collect(),
882 reschedule
883 .vnode_bitmap_updates
884 .iter()
885 .filter(|(actor_id, _)| {
886 !reschedule.newly_created_actors.contains_key(*actor_id)
887 })
888 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
889 .collect(),
890 reschedule.actor_splits.clone(),
891 );
892 }
893
894 let (table_ids, node_actors) = self.collect_base_info();
895
896 let actors_to_create = Some(Command::reschedule_actors_to_create(
898 reschedules,
899 fragment_actors,
900 &self.database_info,
901 partial_graph_manager.control_stream_manager(),
902 ));
903
904 self.database_info
906 .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
907 (
908 *fragment_id,
909 reschedule.removed_actors.iter().cloned().collect(),
910 )
911 }));
912
913 let mutation = Command::reschedule_to_mutation(
915 reschedules,
916 fragment_actors,
917 partial_graph_manager.control_stream_manager(),
918 &mut self.database_info,
919 )?;
920
921 let reschedules = reschedule_plan
922 .expect("reschedule intent should be resolved in global barrier worker")
923 .reschedules;
924 (
925 mutation,
926 table_ids,
927 actors_to_create,
928 node_actors,
929 PostCollectCommand::Reschedule { reschedules },
930 )
931 }
932
933 Some(Command::ReplaceStreamJob(plan)) => {
934 let ensembles = resolve_no_shuffle_ensembles(
935 &plan.new_fragments,
936 &plan.upstream_fragment_downstreams,
937 )?;
938 let mut render_result = render_actors(
939 &plan.new_fragments,
940 &self.database_info,
941 "", &plan.new_fragments.inner.ctx,
943 &plan.streaming_job_model,
944 partial_graph_manager
945 .control_stream_manager()
946 .env
947 .actor_id_generator(),
948 worker_nodes,
949 adaptive_parallelism_strategy,
950 &ensembles,
951 &plan.database_resource_group,
952 )?;
953
954 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
957 let actor_id_counter = partial_graph_manager
958 .control_stream_manager()
959 .env
960 .actor_id_generator();
961 for sink_ctx in sinks {
962 let original_fragment_id = sink_ctx.original_fragment.fragment_id;
963 let original_frag_info = self.database_info.fragment(original_fragment_id);
964 let actor_template = EnsembleActorTemplate::from_existing_inflight_fragment(
965 original_frag_info,
966 );
967 let new_aligner =
968 ComponentFragmentAligner::new(&actor_template, actor_id_counter);
969 let distribution_type: DistributionType =
970 sink_ctx.new_fragment.distribution_type.into();
971 let actor_assignments =
972 new_aligner.align_component_actor(distribution_type);
973 let new_fragment_id = sink_ctx.new_fragment.fragment_id;
974 let mut actors = Vec::with_capacity(actor_assignments.len());
975 for (&actor_id, (worker_id, vnode_bitmap)) in &actor_assignments {
976 render_result.actor_location.insert(actor_id, *worker_id);
977 actors.push(StreamActor {
978 actor_id,
979 fragment_id: new_fragment_id,
980 vnode_bitmap: vnode_bitmap.clone(),
981 mview_definition: String::new(),
982 expr_context: Some(sink_ctx.ctx.to_expr_context()),
983 config_override: sink_ctx.ctx.config_override.clone(),
984 });
985 }
986 render_result.stream_actors.insert(new_fragment_id, actors);
987 }
988 }
989
990 let mut edges = self.database_info.build_edge(
992 None,
993 Some(&plan),
994 None,
995 partial_graph_manager.control_stream_manager(),
996 &render_result.stream_actors,
997 &render_result.actor_location,
998 );
999
1000 let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
1002 .stream_actors
1003 .iter()
1004 .map(|(fragment_id, actors)| {
1005 (
1006 *fragment_id,
1007 actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
1008 )
1009 })
1010 .collect();
1011 let resolved_split_assignment = match &plan.split_plan {
1012 ReplaceJobSplitPlan::Discovered(discovered) => {
1013 SourceManager::resolve_fragment_to_actor_splits(
1014 &plan.new_fragments,
1015 discovered,
1016 &fragment_actor_ids,
1017 )?
1018 }
1019 ReplaceJobSplitPlan::AlignFromPrevious => {
1020 SourceManager::resolve_replace_source_splits(
1021 &plan.new_fragments,
1022 &plan.replace_upstream,
1023 edges.actor_new_no_shuffle(),
1024 |_fragment_id, actor_id| {
1025 self.database_info.fragment_infos().find_map(|fragment| {
1026 fragment
1027 .actors
1028 .get(&actor_id)
1029 .map(|info| info.splits.clone())
1030 })
1031 },
1032 )?
1033 }
1034 };
1035
1036 self.database_info.pre_apply_new_fragments(
1038 plan.new_fragments
1039 .new_fragment_info(
1040 &render_result.stream_actors,
1041 &render_result.actor_location,
1042 &resolved_split_assignment,
1043 )
1044 .map(|(fragment_id, new_fragment)| {
1045 (fragment_id, plan.streaming_job.id(), new_fragment)
1046 }),
1047 );
1048 for (fragment_id, replace_map) in &plan.replace_upstream {
1049 self.database_info
1050 .pre_apply_replace_node_upstream(*fragment_id, replace_map);
1051 }
1052 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1053 self.database_info
1054 .pre_apply_new_fragments(sinks.iter().map(|sink| {
1055 (
1056 sink.new_fragment.fragment_id,
1057 sink.original_sink.id.as_job_id(),
1058 sink.new_fragment_info(
1059 &render_result.stream_actors,
1060 &render_result.actor_location,
1061 ),
1062 )
1063 }));
1064 }
1065
1066 let (table_ids, node_actors) = self.collect_base_info();
1067
1068 let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
1070 &plan,
1071 &mut edges,
1072 &self.database_info,
1073 &render_result.stream_actors,
1074 &render_result.actor_location,
1075 ));
1076
1077 let mutation = Command::replace_stream_job_to_mutation(
1080 &plan,
1081 &mut edges,
1082 &mut self.database_info,
1083 &resolved_split_assignment,
1084 )?;
1085
1086 {
1088 let mut fragment_ids_to_remove: Vec<_> = plan
1089 .old_fragments
1090 .fragments
1091 .values()
1092 .map(|f| f.fragment_id)
1093 .collect();
1094 if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1095 fragment_ids_to_remove
1096 .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
1097 }
1098 self.database_info
1099 .post_apply_remove_fragments(fragment_ids_to_remove);
1100 }
1101
1102 (
1103 mutation,
1104 table_ids,
1105 actors_to_create,
1106 node_actors,
1107 PostCollectCommand::ReplaceStreamJob {
1108 plan,
1109 resolved_split_assignment,
1110 },
1111 )
1112 }
1113
1114 Some(Command::SourceChangeSplit(split_state)) => {
1115 self.database_info.pre_apply_split_assignments(
1117 split_state
1118 .split_assignment
1119 .iter()
1120 .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
1121 );
1122
1123 let mutation = Some(Command::source_change_split_to_mutation(
1124 &split_state.split_assignment,
1125 ));
1126 let (table_ids, node_actors) = self.collect_base_info();
1127 (
1128 mutation,
1129 table_ids,
1130 None,
1131 node_actors,
1132 PostCollectCommand::SourceChangeSplit {
1133 split_assignment: split_state.split_assignment,
1134 },
1135 )
1136 }
1137
1138 Some(Command::CreateSubscription {
1139 subscription_id,
1140 upstream_mv_table_id,
1141 retention_second,
1142 }) => {
1143 self.database_info.register_subscriber(
1144 upstream_mv_table_id.as_job_id(),
1145 subscription_id.as_subscriber_id(),
1146 SubscriberType::Subscription(retention_second),
1147 );
1148 let mutation = Some(Command::create_subscription_to_mutation(
1149 upstream_mv_table_id,
1150 subscription_id,
1151 ));
1152 let (table_ids, node_actors) = self.collect_base_info();
1153 (
1154 mutation,
1155 table_ids,
1156 None,
1157 node_actors,
1158 PostCollectCommand::CreateSubscription { subscription_id },
1159 )
1160 }
1161
1162 Some(Command::DropSubscription {
1163 subscription_id,
1164 upstream_mv_table_id,
1165 }) => {
1166 if self
1167 .database_info
1168 .unregister_subscriber(
1169 upstream_mv_table_id.as_job_id(),
1170 subscription_id.as_subscriber_id(),
1171 )
1172 .is_none()
1173 {
1174 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
1175 }
1176 let mutation = Some(Command::drop_subscription_to_mutation(
1177 upstream_mv_table_id,
1178 subscription_id,
1179 ));
1180 let (table_ids, node_actors) = self.collect_base_info();
1181 (
1182 mutation,
1183 table_ids,
1184 None,
1185 node_actors,
1186 PostCollectCommand::Command("DropSubscription".to_owned()),
1187 )
1188 }
1189
1190 Some(Command::AlterSubscriptionRetention {
1191 subscription_id,
1192 upstream_mv_table_id,
1193 retention_second,
1194 }) => {
1195 self.database_info.update_subscription_retention(
1196 upstream_mv_table_id.as_job_id(),
1197 subscription_id.as_subscriber_id(),
1198 retention_second,
1199 );
1200 self.apply_simple_command(None, "AlterSubscriptionRetention")
1201 }
1202
1203 Some(Command::ConnectorPropsChange(config)) => {
1204 let mutation = Some(Command::connector_props_change_to_mutation(&config));
1205 let (table_ids, node_actors) = self.collect_base_info();
1206 (
1207 mutation,
1208 table_ids,
1209 None,
1210 node_actors,
1211 PostCollectCommand::ConnectorPropsChange(config),
1212 )
1213 }
1214
1215 Some(Command::Refresh {
1216 table_id,
1217 associated_source_id,
1218 }) => {
1219 let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
1220 self.apply_simple_command(mutation, "Refresh")
1221 }
1222
1223 Some(Command::ListFinish {
1224 table_id: _,
1225 associated_source_id,
1226 }) => {
1227 let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
1228 self.apply_simple_command(mutation, "ListFinish")
1229 }
1230
1231 Some(Command::LoadFinish {
1232 table_id: _,
1233 associated_source_id,
1234 }) => {
1235 let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
1236 self.apply_simple_command(mutation, "LoadFinish")
1237 }
1238
1239 Some(Command::ResetSource { source_id }) => {
1240 let mutation = Some(Command::reset_source_to_mutation(source_id));
1241 self.apply_simple_command(mutation, "ResetSource")
1242 }
1243
1244 Some(Command::ResumeBackfill { target }) => {
1245 let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
1246 let (table_ids, node_actors) = self.collect_base_info();
1247 (
1248 mutation,
1249 table_ids,
1250 None,
1251 node_actors,
1252 PostCollectCommand::ResumeBackfill { target },
1253 )
1254 }
1255
1256 Some(Command::InjectSourceOffsets {
1257 source_id,
1258 split_offsets,
1259 }) => {
1260 let mutation = Some(Command::inject_source_offsets_to_mutation(
1261 source_id,
1262 &split_offsets,
1263 ));
1264 self.apply_simple_command(mutation, "InjectSourceOffsets")
1265 }
1266 };
1267
1268 let mut finished_snapshot_backfill_jobs = HashSet::new();
1269 let mutation = match mutation {
1270 Some(mutation) => Some(mutation),
1271 None => {
1272 let mut finished_snapshot_backfill_job_info = HashMap::new();
1273 if barrier_info.kind.is_checkpoint() {
1274 for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
1275 if creating_job.should_merge_to_upstream() {
1276 let info = creating_job
1277 .start_consume_upstream(partial_graph_manager, barrier_info)?;
1278 finished_snapshot_backfill_job_info
1279 .try_insert(job_id, info)
1280 .expect("non-duplicated");
1281 }
1282 }
1283 }
1284
1285 if !finished_snapshot_backfill_job_info.is_empty() {
1286 let actors_to_create = actors_to_create.get_or_insert_default();
1287 let mut subscriptions_to_drop = vec![];
1288 let mut dispatcher_update = vec![];
1289 let mut actor_splits = HashMap::new();
1290 for (job_id, info) in finished_snapshot_backfill_job_info {
1291 finished_snapshot_backfill_jobs.insert(job_id);
1292 subscriptions_to_drop.extend(
1293 info.snapshot_backfill_upstream_tables.iter().map(
1294 |upstream_table_id| PbSubscriptionUpstreamInfo {
1295 subscriber_id: job_id.as_subscriber_id(),
1296 upstream_mv_table_id: *upstream_table_id,
1297 },
1298 ),
1299 );
1300 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
1301 assert_matches!(
1302 self.database_info.unregister_subscriber(
1303 upstream_mv_table_id.as_job_id(),
1304 job_id.as_subscriber_id()
1305 ),
1306 Some(SubscriberType::SnapshotBackfill)
1307 );
1308 }
1309
1310 table_ids_to_commit.extend(
1311 info.fragment_infos
1312 .values()
1313 .flat_map(|fragment| fragment.state_table_ids.iter())
1314 .copied(),
1315 );
1316
1317 let actor_len = info
1318 .fragment_infos
1319 .values()
1320 .map(|fragment| fragment.actors.len() as u64)
1321 .sum();
1322 let id_gen = GlobalActorIdGen::new(
1323 partial_graph_manager
1324 .control_stream_manager()
1325 .env
1326 .actor_id_generator(),
1327 actor_len,
1328 );
1329 let mut next_local_actor_id = 0;
1330 let actor_mapping: HashMap<_, _> = info
1332 .fragment_infos
1333 .values()
1334 .flat_map(|fragment| fragment.actors.keys())
1335 .map(|old_actor_id| {
1336 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
1337 next_local_actor_id += 1;
1338 (*old_actor_id, new_actor_id.as_global_id())
1339 })
1340 .collect();
1341 let actor_mapping = &actor_mapping;
1342 let new_stream_actors: HashMap<_, _> = info
1343 .stream_actors
1344 .into_iter()
1345 .map(|(old_actor_id, mut actor)| {
1346 let new_actor_id = actor_mapping[&old_actor_id];
1347 actor.actor_id = new_actor_id;
1348 (new_actor_id, actor)
1349 })
1350 .collect();
1351 let new_fragment_info: HashMap<_, _> = info
1352 .fragment_infos
1353 .into_iter()
1354 .map(|(fragment_id, mut fragment)| {
1355 let actors = take(&mut fragment.actors);
1356 fragment.actors = actors
1357 .into_iter()
1358 .map(|(old_actor_id, actor)| {
1359 let new_actor_id = actor_mapping[&old_actor_id];
1360 (new_actor_id, actor)
1361 })
1362 .collect();
1363 (fragment_id, fragment)
1364 })
1365 .collect();
1366 actor_splits.extend(
1367 new_fragment_info
1368 .values()
1369 .flat_map(|fragment| &fragment.actors)
1370 .map(|(actor_id, actor)| {
1371 (
1372 *actor_id,
1373 ConnectorSplits {
1374 splits: actor
1375 .splits
1376 .iter()
1377 .map(ConnectorSplit::from)
1378 .collect(),
1379 },
1380 )
1381 }),
1382 );
1383 let partial_graph_id = to_partial_graph_id(self.database_id, None);
1385 let mut edge_builder = FragmentEdgeBuilder::new(
1386 info.upstream_fragment_downstreams
1387 .keys()
1388 .map(|upstream_fragment_id| {
1389 self.database_info.fragment(*upstream_fragment_id)
1390 })
1391 .chain(new_fragment_info.values())
1392 .map(|fragment| {
1393 (
1394 fragment.fragment_id,
1395 EdgeBuilderFragmentInfo::from_inflight(
1396 fragment,
1397 partial_graph_id,
1398 partial_graph_manager.control_stream_manager(),
1399 ),
1400 )
1401 }),
1402 );
1403 edge_builder.add_relations(&info.upstream_fragment_downstreams);
1404 edge_builder.add_relations(&info.downstreams);
1405 let mut edges = edge_builder.build();
1406 let new_actors_to_create = edges.collect_actors_to_create(
1407 new_fragment_info.values().map(|fragment| {
1408 (
1409 fragment.fragment_id,
1410 &fragment.nodes,
1411 fragment.actors.iter().map(|(actor_id, actor)| {
1412 (&new_stream_actors[actor_id], actor.worker_id)
1413 }),
1414 [], )
1416 }),
1417 );
1418 dispatcher_update.extend(
1419 info.upstream_fragment_downstreams.keys().flat_map(
1420 |upstream_fragment_id| {
1421 let new_actor_dispatchers = edges
1422 .dispatchers
1423 .remove(upstream_fragment_id)
1424 .expect("should exist");
1425 new_actor_dispatchers.into_iter().flat_map(
1426 |(upstream_actor_id, dispatchers)| {
1427 dispatchers.into_iter().map(move |dispatcher| {
1428 PbDispatcherUpdate {
1429 actor_id: upstream_actor_id,
1430 dispatcher_id: dispatcher.dispatcher_id,
1431 hash_mapping: dispatcher.hash_mapping,
1432 removed_downstream_actor_id: dispatcher
1433 .downstream_actor_id
1434 .iter()
1435 .map(|new_downstream_actor_id| {
1436 actor_mapping
1437 .iter()
1438 .find_map(
1439 |(old_actor_id, new_actor_id)| {
1440 (new_downstream_actor_id
1441 == new_actor_id)
1442 .then_some(*old_actor_id)
1443 },
1444 )
1445 .expect("should exist")
1446 })
1447 .collect(),
1448 added_downstream_actor_id: dispatcher
1449 .downstream_actor_id,
1450 }
1451 })
1452 },
1453 )
1454 },
1455 ),
1456 );
1457 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1458 for (worker_id, worker_actors) in new_actors_to_create {
1459 node_actors.entry(worker_id).or_default().extend(
1460 worker_actors.values().flat_map(|(_, actors, _)| {
1461 actors.iter().map(|(actor, _, _)| actor.actor_id)
1462 }),
1463 );
1464 actors_to_create
1465 .entry(worker_id)
1466 .or_default()
1467 .extend(worker_actors);
1468 }
1469 self.database_info.add_existing(InflightStreamingJobInfo {
1470 job_id,
1471 fragment_infos: new_fragment_info,
1472 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
1474 cdc_table_backfill_tracker: None, });
1476 }
1477
1478 Some(PbMutation::Update(PbUpdateMutation {
1479 dispatcher_update,
1480 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
1484 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
1488 }))
1489 } else {
1490 let fragment_ids = self.database_info.take_pending_backfill_nodes();
1491 if fragment_ids.is_empty() {
1492 None
1493 } else {
1494 Some(PbMutation::StartFragmentBackfill(
1495 PbStartFragmentBackfillMutation { fragment_ids },
1496 ))
1497 }
1498 }
1499 }
1500 };
1501
1502 for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
1504 if !finished_snapshot_backfill_jobs.contains(job_id) {
1505 let throttle_mutation = if let Some((ref jobs, ref config)) =
1506 throttle_for_creating_jobs
1507 && jobs.contains(job_id)
1508 {
1509 assert_eq!(
1510 jobs.len(),
1511 1,
1512 "should not alter rate limit of snapshot backfill job with other jobs"
1513 );
1514 Some((
1515 Mutation::Throttle(ThrottleMutation {
1516 fragment_throttle: config
1517 .iter()
1518 .map(|(fragment_id, config)| (*fragment_id, *config))
1519 .collect(),
1520 }),
1521 take(notifiers),
1522 ))
1523 } else {
1524 None
1525 };
1526 creating_job.on_new_upstream_barrier(
1527 partial_graph_manager,
1528 barrier_info,
1529 throttle_mutation,
1530 )?;
1531 }
1532 }
1533
1534 partial_graph_manager.inject_barrier(
1535 to_partial_graph_id(self.database_id, None),
1536 mutation,
1537 barrier_info,
1538 &node_actors,
1539 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1540 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1541 actors_to_create,
1542 )?;
1543
1544 Ok(ApplyCommandInfo {
1545 mv_subscription_max_retention: self.database_info.max_subscription_retention(),
1546 table_ids_to_commit,
1547 jobs_to_wait: finished_snapshot_backfill_jobs,
1548 command: post_collect_command,
1549 })
1550 }
1551}