1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::ActorMapping;
23use risingwave_common::must_match;
24use risingwave_common::types::Timestamptz;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_connector::source::SplitImpl;
27use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
28use risingwave_meta_model::WorkerId;
29use risingwave_pb::catalog::{CreateType, Table};
30use risingwave_pb::common::ActorInfo;
31use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
32use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
35use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
36use risingwave_pb::stream_plan::update_mutation::*;
37use risingwave_pb::stream_plan::{
38 AddMutation, BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatcher,
39 Dispatchers, DropSubscriptionsMutation, PauseMutation, ResumeMutation,
40 SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
41 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
42};
43use risingwave_pb::stream_service::BarrierCompleteResponse;
44use tracing::warn;
45
46use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
47use crate::barrier::InflightSubscriptionInfo;
48use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
49use crate::barrier::edge_builder::FragmentEdgeBuildResult;
50use crate::barrier::info::BarrierInfo;
51use crate::barrier::rpc::ControlStreamManager;
52use crate::barrier::utils::collect_resp_info;
53use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
54use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
55use crate::manager::{StreamingJob, StreamingJobType};
56use crate::model::{
57 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
58 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
59 StreamJobFragments, StreamJobFragmentsToCreate,
60};
61use crate::stream::{
62 ConnectorPropsChange, FragmentBackfillOrder, JobReschedulePostUpdates, SplitAssignment,
63 ThrottleConfig, build_actor_connector_splits,
64};
65
66#[derive(Debug, Clone)]
69pub struct Reschedule {
70 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
72
73 pub removed_actors: HashSet<ActorId>,
75
76 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
78
79 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
81 pub upstream_dispatcher_mapping: Option<ActorMapping>,
86
87 pub downstream_fragment_ids: Vec<FragmentId>,
89
90 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
94
95 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
96}
97
98#[derive(Debug, Clone)]
105pub struct ReplaceStreamJobPlan {
106 pub old_fragments: StreamJobFragments,
107 pub new_fragments: StreamJobFragmentsToCreate,
108 pub replace_upstream: FragmentReplaceUpstream,
111 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
112 pub init_split_assignment: SplitAssignment,
118 pub streaming_job: StreamingJob,
120 pub tmp_id: u32,
122 pub to_drop_state_table_ids: Vec<TableId>,
124}
125
126impl ReplaceStreamJobPlan {
127 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
128 let mut fragment_changes = HashMap::new();
129 for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
130 let fragment_change =
131 CommandFragmentChanges::NewFragment(self.streaming_job.id().into(), new_fragment);
132 fragment_changes
133 .try_insert(fragment_id, fragment_change)
134 .expect("non-duplicate");
135 }
136 for fragment in self.old_fragments.fragments.values() {
137 fragment_changes
138 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
139 .expect("non-duplicate");
140 }
141 for (fragment_id, replace_map) in &self.replace_upstream {
142 fragment_changes
143 .try_insert(
144 *fragment_id,
145 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
146 )
147 .expect("non-duplicate");
148 }
149 fragment_changes
150 }
151
152 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
154 let mut fragment_replacements = HashMap::new();
155 for (upstream_fragment_id, new_upstream_fragment_id) in
156 self.replace_upstream.values().flatten()
157 {
158 {
159 let r =
160 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
161 if let Some(r) = r {
162 assert_eq!(
163 *new_upstream_fragment_id, r,
164 "one fragment is replaced by multiple fragments"
165 );
166 }
167 }
168 }
169 fragment_replacements
170 }
171}
172
173#[derive(educe::Educe, Clone)]
174#[educe(Debug)]
175pub struct CreateStreamingJobCommandInfo {
176 #[educe(Debug(ignore))]
177 pub stream_job_fragments: StreamJobFragmentsToCreate,
178 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
179 pub init_split_assignment: SplitAssignment,
180 pub definition: String,
181 pub job_type: StreamingJobType,
182 pub create_type: CreateType,
183 pub streaming_job: StreamingJob,
184 pub internal_tables: Vec<Table>,
185 pub fragment_backfill_ordering: FragmentBackfillOrder,
186}
187
188impl StreamJobFragments {
189 pub(super) fn new_fragment_info(
190 &self,
191 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
192 self.fragments.values().map(|fragment| {
193 (
194 fragment.fragment_id,
195 InflightFragmentInfo {
196 fragment_id: fragment.fragment_id,
197 distribution_type: fragment.distribution_type.into(),
198 nodes: fragment.nodes.clone(),
199 actors: fragment
200 .actors
201 .iter()
202 .map(|actor| {
203 (
204 actor.actor_id,
205 InflightActorInfo {
206 worker_id: self
207 .actor_status
208 .get(&actor.actor_id)
209 .expect("should exist")
210 .worker_id()
211 as WorkerId,
212 vnode_bitmap: actor.vnode_bitmap.clone(),
213 },
214 )
215 })
216 .collect(),
217 state_table_ids: fragment
218 .state_table_ids
219 .iter()
220 .map(|table_id| TableId::new(*table_id))
221 .collect(),
222 },
223 )
224 })
225 }
226}
227
228#[derive(Debug, Clone)]
229pub struct SnapshotBackfillInfo {
230 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
234}
235
236#[derive(Debug, Clone)]
237pub enum CreateStreamingJobType {
238 Normal,
239 SinkIntoTable(ReplaceStreamJobPlan),
240 SnapshotBackfill(SnapshotBackfillInfo),
241}
242
243#[derive(Debug)]
248pub enum Command {
249 Flush,
252
253 Pause,
256
257 Resume,
261
262 DropStreamingJobs {
270 table_fragments_ids: HashSet<TableId>,
271 actors: Vec<ActorId>,
272 unregistered_state_table_ids: HashSet<TableId>,
273 unregistered_fragment_ids: HashSet<FragmentId>,
274 },
275
276 CreateStreamingJob {
286 info: CreateStreamingJobCommandInfo,
287 job_type: CreateStreamingJobType,
288 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
289 },
290 MergeSnapshotBackfillStreamingJobs(
291 HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
292 ),
293
294 RescheduleFragment {
300 reschedules: HashMap<FragmentId, Reschedule>,
301 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
303 post_updates: JobReschedulePostUpdates,
305 },
306
307 ReplaceStreamJob(ReplaceStreamJobPlan),
314
315 SourceChangeSplit(SplitAssignment),
318
319 Throttle(ThrottleConfig),
322
323 CreateSubscription {
326 subscription_id: u32,
327 upstream_mv_table_id: TableId,
328 retention_second: u64,
329 },
330
331 DropSubscription {
335 subscription_id: u32,
336 upstream_mv_table_id: TableId,
337 },
338
339 ConnectorPropsChange(ConnectorPropsChange),
340
341 StartFragmentBackfill {
343 fragment_ids: Vec<FragmentId>,
344 },
345}
346
347impl std::fmt::Display for Command {
349 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
350 match self {
351 Command::Flush => write!(f, "Flush"),
352 Command::Pause => write!(f, "Pause"),
353 Command::Resume => write!(f, "Resume"),
354 Command::DropStreamingJobs {
355 table_fragments_ids,
356 ..
357 } => {
358 write!(
359 f,
360 "DropStreamingJobs: {}",
361 table_fragments_ids.iter().sorted().join(", ")
362 )
363 }
364 Command::CreateStreamingJob { info, .. } => {
365 write!(f, "CreateStreamingJob: {}", info.streaming_job)
366 }
367 Command::MergeSnapshotBackfillStreamingJobs(_) => {
368 write!(f, "MergeSnapshotBackfillStreamingJobs")
369 }
370 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
371 Command::ReplaceStreamJob(plan) => {
372 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
373 }
374 Command::SourceChangeSplit(_) => write!(f, "SourceChangeSplit"),
375 Command::Throttle(_) => write!(f, "Throttle"),
376 Command::CreateSubscription {
377 subscription_id, ..
378 } => write!(f, "CreateSubscription: {subscription_id}"),
379 Command::DropSubscription {
380 subscription_id, ..
381 } => write!(f, "DropSubscription: {subscription_id}"),
382 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
383 Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
384 }
385 }
386}
387
388impl Command {
389 pub fn pause() -> Self {
390 Self::Pause
391 }
392
393 pub fn resume() -> Self {
394 Self::Resume
395 }
396
397 pub fn cancel(table_fragments: &StreamJobFragments) -> Self {
398 Self::DropStreamingJobs {
399 table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
400 actors: table_fragments.actor_ids(),
401 unregistered_state_table_ids: table_fragments
402 .all_table_ids()
403 .map(TableId::new)
404 .collect(),
405 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
406 }
407 }
408
409 pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
410 match self {
411 Command::Flush => None,
412 Command::Pause => None,
413 Command::Resume => None,
414 Command::DropStreamingJobs {
415 unregistered_fragment_ids,
416 ..
417 } => Some(
418 unregistered_fragment_ids
419 .iter()
420 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
421 .collect(),
422 ),
423 Command::CreateStreamingJob { info, job_type, .. } => {
424 assert!(
425 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
426 "should handle fragment changes separately for snapshot backfill"
427 );
428 let mut changes: HashMap<_, _> = info
429 .stream_job_fragments
430 .new_fragment_info()
431 .map(|(fragment_id, fragment_info)| {
432 (
433 fragment_id,
434 CommandFragmentChanges::NewFragment(
435 info.streaming_job.id().into(),
436 fragment_info,
437 ),
438 )
439 })
440 .collect();
441
442 if let CreateStreamingJobType::SinkIntoTable(plan) = job_type {
443 let extra_change = plan.fragment_changes();
444 changes.extend(extra_change);
445 }
446
447 Some(changes)
448 }
449 Command::RescheduleFragment { reschedules, .. } => Some(
450 reschedules
451 .iter()
452 .map(|(fragment_id, reschedule)| {
453 (
454 *fragment_id,
455 CommandFragmentChanges::Reschedule {
456 new_actors: reschedule
457 .added_actors
458 .iter()
459 .flat_map(|(node_id, actors)| {
460 actors.iter().map(|actor_id| {
461 (
462 *actor_id,
463 InflightActorInfo {
464 worker_id: *node_id,
465 vnode_bitmap: reschedule
466 .newly_created_actors
467 .get(actor_id)
468 .expect("should exist")
469 .0
470 .0
471 .vnode_bitmap
472 .clone(),
473 },
474 )
475 })
476 })
477 .collect(),
478 actor_update_vnode_bitmap: reschedule
479 .vnode_bitmap_updates
480 .iter()
481 .filter(|(actor_id, _)| {
482 !reschedule.newly_created_actors.contains_key(actor_id)
484 })
485 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
486 .collect(),
487 to_remove: reschedule.removed_actors.iter().cloned().collect(),
488 },
489 )
490 })
491 .collect(),
492 ),
493 Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
494 Command::MergeSnapshotBackfillStreamingJobs(_) => None,
495 Command::SourceChangeSplit(_) => None,
496 Command::Throttle(_) => None,
497 Command::CreateSubscription { .. } => None,
498 Command::DropSubscription { .. } => None,
499 Command::ConnectorPropsChange(_) => None,
500 Command::StartFragmentBackfill { .. } => None,
501 }
502 }
503
504 pub fn need_checkpoint(&self) -> bool {
505 !matches!(self, Command::Resume)
507 }
508}
509
510#[derive(Debug, Clone)]
511pub enum BarrierKind {
512 Initial,
513 Barrier,
514 Checkpoint(Vec<u64>),
516}
517
518impl BarrierKind {
519 pub fn to_protobuf(&self) -> PbBarrierKind {
520 match self {
521 BarrierKind::Initial => PbBarrierKind::Initial,
522 BarrierKind::Barrier => PbBarrierKind::Barrier,
523 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
524 }
525 }
526
527 pub fn is_checkpoint(&self) -> bool {
528 matches!(self, BarrierKind::Checkpoint(_))
529 }
530
531 pub fn is_initial(&self) -> bool {
532 matches!(self, BarrierKind::Initial)
533 }
534
535 pub fn as_str_name(&self) -> &'static str {
536 match self {
537 BarrierKind::Initial => "Initial",
538 BarrierKind::Barrier => "Barrier",
539 BarrierKind::Checkpoint(_) => "Checkpoint",
540 }
541 }
542}
543
544pub(super) struct CommandContext {
547 subscription_info: InflightSubscriptionInfo,
548
549 pub(super) barrier_info: BarrierInfo,
550
551 pub(super) table_ids_to_commit: HashSet<TableId>,
552
553 pub(super) command: Option<Command>,
554
555 _span: tracing::Span,
561}
562
563impl std::fmt::Debug for CommandContext {
564 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
565 f.debug_struct("CommandContext")
566 .field("barrier_info", &self.barrier_info)
567 .field("command", &self.command)
568 .finish()
569 }
570}
571
572impl std::fmt::Display for CommandContext {
573 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
574 write!(
575 f,
576 "prev_epoch={}, curr_epoch={}, kind={}",
577 self.barrier_info.prev_epoch.value().0,
578 self.barrier_info.curr_epoch.value().0,
579 self.barrier_info.kind.as_str_name()
580 )?;
581 if let Some(command) = &self.command {
582 write!(f, ", command={}", command)?;
583 }
584 Ok(())
585 }
586}
587
588impl CommandContext {
589 pub(super) fn new(
590 barrier_info: BarrierInfo,
591 subscription_info: InflightSubscriptionInfo,
592 table_ids_to_commit: HashSet<TableId>,
593 command: Option<Command>,
594 span: tracing::Span,
595 ) -> Self {
596 Self {
597 subscription_info,
598 barrier_info,
599 table_ids_to_commit,
600 command,
601 _span: span,
602 }
603 }
604
605 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
606 let Some(truncate_timestamptz) = Timestamptz::from_secs(
607 self.barrier_info
608 .prev_epoch
609 .value()
610 .as_timestamptz()
611 .timestamp()
612 - retention_second as i64,
613 ) else {
614 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
615 return self.barrier_info.prev_epoch.value();
616 };
617 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
618 }
619
620 pub(super) fn collect_commit_epoch_info(
621 &self,
622 info: &mut CommitEpochInfo,
623 resps: Vec<BarrierCompleteResponse>,
624 backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
625 ) {
626 let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) =
627 collect_resp_info(resps);
628
629 let new_table_fragment_infos =
630 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
631 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
632 {
633 let table_fragments = &info.stream_job_fragments;
634 let mut table_ids: HashSet<_> = table_fragments
635 .internal_table_ids()
636 .into_iter()
637 .map(TableId::new)
638 .collect();
639 if let Some(mv_table_id) = table_fragments.mv_table_id() {
640 table_ids.insert(TableId::new(mv_table_id));
641 }
642
643 vec![NewTableFragmentInfo { table_ids }]
644 } else {
645 vec![]
646 };
647
648 let mut mv_log_store_truncate_epoch = HashMap::new();
649 let mut update_truncate_epoch =
651 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
652 .entry(table_id.table_id)
653 {
654 Entry::Occupied(mut entry) => {
655 let prev_truncate_epoch = entry.get_mut();
656 if truncate_epoch < *prev_truncate_epoch {
657 *prev_truncate_epoch = truncate_epoch;
658 }
659 }
660 Entry::Vacant(entry) => {
661 entry.insert(truncate_epoch);
662 }
663 };
664 for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
665 if let Some(truncate_epoch) = subscriptions
666 .values()
667 .max()
668 .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
669 {
670 update_truncate_epoch(*mv_table_id, truncate_epoch);
671 }
672 }
673 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
674 for mv_table_id in upstream_mv_table_ids {
675 update_truncate_epoch(mv_table_id, backfill_epoch);
676 }
677 }
678
679 let table_new_change_log = build_table_change_log_delta(
680 old_value_ssts.into_iter(),
681 synced_ssts.iter().map(|sst| &sst.sst_info),
682 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
683 mv_log_store_truncate_epoch.into_iter(),
684 );
685
686 let epoch = self.barrier_info.prev_epoch();
687 for table_id in &self.table_ids_to_commit {
688 info.tables_to_commit
689 .try_insert(*table_id, epoch)
690 .expect("non duplicate");
691 }
692
693 info.sstables.extend(synced_ssts);
694 info.new_table_watermarks.extend(new_table_watermarks);
695 info.sst_to_context.extend(sst_to_context);
696 info.new_table_fragment_infos
697 .extend(new_table_fragment_infos);
698 info.change_log_delta.extend(table_new_change_log);
699 }
700}
701
702impl Command {
703 pub(super) fn to_mutation(
707 &self,
708 is_currently_paused: bool,
709 edges: &mut Option<FragmentEdgeBuildResult>,
710 control_stream_manager: &ControlStreamManager,
711 ) -> Option<Mutation> {
712 match self {
713 Command::Flush => None,
714
715 Command::Pause => {
716 if !is_currently_paused {
719 Some(Mutation::Pause(PauseMutation {}))
720 } else {
721 None
722 }
723 }
724
725 Command::Resume => {
726 if is_currently_paused {
728 Some(Mutation::Resume(ResumeMutation {}))
729 } else {
730 None
731 }
732 }
733
734 Command::SourceChangeSplit(change) => {
735 let mut diff = HashMap::new();
736
737 for actor_splits in change.values() {
738 diff.extend(actor_splits.clone());
739 }
740
741 Some(Mutation::Splits(SourceChangeSplitMutation {
742 actor_splits: build_actor_connector_splits(&diff),
743 }))
744 }
745
746 Command::Throttle(config) => {
747 let mut actor_to_apply = HashMap::new();
748 for per_fragment in config.values() {
749 actor_to_apply.extend(
750 per_fragment
751 .iter()
752 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
753 );
754 }
755
756 Some(Mutation::Throttle(ThrottleMutation {
757 actor_throttle: actor_to_apply,
758 }))
759 }
760
761 Command::DropStreamingJobs { actors, .. } => Some(Mutation::Stop(StopMutation {
762 actors: actors.clone(),
763 })),
764
765 Command::CreateStreamingJob {
766 info:
767 CreateStreamingJobCommandInfo {
768 stream_job_fragments: table_fragments,
769 init_split_assignment: split_assignment,
770 upstream_fragment_downstreams,
771 fragment_backfill_ordering,
772 ..
773 },
774 job_type,
775 ..
776 } => {
777 let edges = edges.as_mut().expect("should exist");
778 let added_actors = table_fragments.actor_ids();
779 let actor_splits = split_assignment
780 .values()
781 .flat_map(build_actor_connector_splits)
782 .collect();
783 let subscriptions_to_add =
784 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
785 job_type
786 {
787 snapshot_backfill_info
788 .upstream_mv_table_id_to_backfill_epoch
789 .keys()
790 .map(|table_id| SubscriptionUpstreamInfo {
791 subscriber_id: table_fragments.stream_job_id().table_id,
792 upstream_mv_table_id: table_id.table_id,
793 })
794 .collect()
795 } else {
796 Default::default()
797 };
798 let backfill_nodes_to_pause: Vec<_> =
799 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
800 .into_iter()
801 .collect();
802 let add = Some(Mutation::Add(AddMutation {
803 actor_dispatchers: edges
804 .dispatchers
805 .extract_if(|fragment_id, _| {
806 upstream_fragment_downstreams.contains_key(fragment_id)
807 })
808 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
809 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
810 .collect(),
811 added_actors,
812 actor_splits,
813 pause: is_currently_paused,
815 subscriptions_to_add,
816 backfill_nodes_to_pause,
817 }));
818
819 if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
820 old_fragments,
821 init_split_assignment,
822 replace_upstream,
823 upstream_fragment_downstreams,
824 ..
825 }) = job_type
826 {
827 let merge_updates = edges
828 .merge_updates
829 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
830 .collect();
831 let dispatchers = edges
832 .dispatchers
833 .extract_if(|fragment_id, _| {
834 upstream_fragment_downstreams.contains_key(fragment_id)
835 })
836 .collect();
837 let update = Self::generate_update_mutation_for_replace_table(
838 old_fragments,
839 merge_updates,
840 dispatchers,
841 init_split_assignment,
842 );
843
844 Some(Mutation::Combined(CombinedMutation {
845 mutations: vec![
846 BarrierMutation { mutation: add },
847 BarrierMutation { mutation: update },
848 ],
849 }))
850 } else {
851 add
852 }
853 }
854 Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
855 Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
856 info: jobs_to_merge
857 .iter()
858 .flat_map(|(table_id, (backfill_upstream_tables, _))| {
859 backfill_upstream_tables
860 .iter()
861 .map(move |upstream_table_id| SubscriptionUpstreamInfo {
862 subscriber_id: table_id.table_id,
863 upstream_mv_table_id: upstream_table_id.table_id,
864 })
865 })
866 .collect(),
867 }))
868 }
869
870 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
871 old_fragments,
872 replace_upstream,
873 upstream_fragment_downstreams,
874 init_split_assignment,
875 ..
876 }) => {
877 let edges = edges.as_mut().expect("should exist");
878 let merge_updates = edges
879 .merge_updates
880 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
881 .collect();
882 let dispatchers = edges
883 .dispatchers
884 .extract_if(|fragment_id, _| {
885 upstream_fragment_downstreams.contains_key(fragment_id)
886 })
887 .collect();
888 Self::generate_update_mutation_for_replace_table(
889 old_fragments,
890 merge_updates,
891 dispatchers,
892 init_split_assignment,
893 )
894 }
895
896 Command::RescheduleFragment {
897 reschedules,
898 fragment_actors,
899 ..
900 } => {
901 let mut dispatcher_update = HashMap::new();
902 for reschedule in reschedules.values() {
903 for &(upstream_fragment_id, dispatcher_id) in
904 &reschedule.upstream_fragment_dispatcher_ids
905 {
906 let upstream_actor_ids = fragment_actors
908 .get(&upstream_fragment_id)
909 .expect("should contain");
910
911 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
912
913 for &actor_id in upstream_actor_ids {
915 let added_downstream_actor_id = if upstream_reschedule
916 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
917 .unwrap_or(true)
918 {
919 reschedule
920 .added_actors
921 .values()
922 .flatten()
923 .cloned()
924 .collect()
925 } else {
926 Default::default()
927 };
928 dispatcher_update
930 .try_insert(
931 (actor_id, dispatcher_id),
932 DispatcherUpdate {
933 actor_id,
934 dispatcher_id,
935 hash_mapping: reschedule
936 .upstream_dispatcher_mapping
937 .as_ref()
938 .map(|m| m.to_protobuf()),
939 added_downstream_actor_id,
940 removed_downstream_actor_id: reschedule
941 .removed_actors
942 .iter()
943 .cloned()
944 .collect(),
945 },
946 )
947 .unwrap();
948 }
949 }
950 }
951 let dispatcher_update = dispatcher_update.into_values().collect();
952
953 let mut merge_update = HashMap::new();
954 for (&fragment_id, reschedule) in reschedules {
955 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
956 let downstream_actor_ids = fragment_actors
958 .get(&downstream_fragment_id)
959 .expect("should contain");
960
961 let downstream_removed_actors: HashSet<_> = reschedules
965 .get(&downstream_fragment_id)
966 .map(|downstream_reschedule| {
967 downstream_reschedule
968 .removed_actors
969 .iter()
970 .copied()
971 .collect()
972 })
973 .unwrap_or_default();
974
975 for &actor_id in downstream_actor_ids {
977 if downstream_removed_actors.contains(&actor_id) {
978 continue;
979 }
980
981 merge_update
983 .try_insert(
984 (actor_id, fragment_id),
985 MergeUpdate {
986 actor_id,
987 upstream_fragment_id: fragment_id,
988 new_upstream_fragment_id: None,
989 added_upstream_actors: reschedule
990 .added_actors
991 .iter()
992 .flat_map(|(worker_id, actors)| {
993 let host =
994 control_stream_manager.host_addr(*worker_id);
995 actors.iter().map(move |actor_id| ActorInfo {
996 actor_id: *actor_id,
997 host: Some(host.clone()),
998 })
999 })
1000 .collect(),
1001 removed_upstream_actor_id: reschedule
1002 .removed_actors
1003 .iter()
1004 .cloned()
1005 .collect(),
1006 },
1007 )
1008 .unwrap();
1009 }
1010 }
1011 }
1012 let merge_update = merge_update.into_values().collect();
1013
1014 let mut actor_vnode_bitmap_update = HashMap::new();
1015 for reschedule in reschedules.values() {
1016 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1018 let bitmap = bitmap.to_protobuf();
1019 actor_vnode_bitmap_update
1020 .try_insert(actor_id, bitmap)
1021 .unwrap();
1022 }
1023 }
1024 let dropped_actors = reschedules
1025 .values()
1026 .flat_map(|r| r.removed_actors.iter().copied())
1027 .collect();
1028 let mut actor_splits = HashMap::new();
1029
1030 for reschedule in reschedules.values() {
1031 for (actor_id, splits) in &reschedule.actor_splits {
1032 actor_splits.insert(
1033 *actor_id as ActorId,
1034 ConnectorSplits {
1035 splits: splits.iter().map(ConnectorSplit::from).collect(),
1036 },
1037 );
1038 }
1039 }
1040
1041 let actor_new_dispatchers = HashMap::new();
1043
1044 let mutation = Mutation::Update(UpdateMutation {
1045 dispatcher_update,
1046 merge_update,
1047 actor_vnode_bitmap_update,
1048 dropped_actors,
1049 actor_splits,
1050 actor_new_dispatchers,
1051 });
1052 tracing::debug!("update mutation: {mutation:?}");
1053 Some(mutation)
1054 }
1055
1056 Command::CreateSubscription {
1057 upstream_mv_table_id,
1058 subscription_id,
1059 ..
1060 } => Some(Mutation::Add(AddMutation {
1061 actor_dispatchers: Default::default(),
1062 added_actors: vec![],
1063 actor_splits: Default::default(),
1064 pause: false,
1065 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1066 upstream_mv_table_id: upstream_mv_table_id.table_id,
1067 subscriber_id: *subscription_id,
1068 }],
1069 backfill_nodes_to_pause: vec![],
1070 })),
1071 Command::DropSubscription {
1072 upstream_mv_table_id,
1073 subscription_id,
1074 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1075 info: vec![SubscriptionUpstreamInfo {
1076 subscriber_id: *subscription_id,
1077 upstream_mv_table_id: upstream_mv_table_id.table_id,
1078 }],
1079 })),
1080 Command::ConnectorPropsChange(config) => {
1081 let mut connector_props_infos = HashMap::default();
1082 for (k, v) in config {
1083 connector_props_infos.insert(
1084 *k,
1085 ConnectorPropsInfo {
1086 connector_props_info: v.clone(),
1087 },
1088 );
1089 }
1090 Some(Mutation::ConnectorPropsChange(
1091 ConnectorPropsChangeMutation {
1092 connector_props_infos,
1093 },
1094 ))
1095 }
1096 Command::StartFragmentBackfill { fragment_ids } => Some(
1097 Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1098 fragment_ids: fragment_ids.clone(),
1099 }),
1100 ),
1101 }
1102 }
1103
1104 pub(super) fn actors_to_create(
1105 &self,
1106 graph_info: &InflightDatabaseInfo,
1107 edges: &mut Option<FragmentEdgeBuildResult>,
1108 control_stream_manager: &ControlStreamManager,
1109 ) -> Option<StreamJobActorsToCreate> {
1110 match self {
1111 Command::CreateStreamingJob { info, job_type, .. } => {
1112 let sink_into_table_replace_plan = match job_type {
1113 CreateStreamingJobType::Normal => None,
1114 CreateStreamingJobType::SinkIntoTable(replace_table) => Some(replace_table),
1115 CreateStreamingJobType::SnapshotBackfill(_) => {
1116 return None;
1118 }
1119 };
1120 let get_actors_to_create = || {
1121 sink_into_table_replace_plan
1122 .map(|plan| plan.new_fragments.actors_to_create())
1123 .into_iter()
1124 .flatten()
1125 .chain(info.stream_job_fragments.actors_to_create())
1126 };
1127 let edges = edges.as_mut().expect("should exist");
1128 Some(edges.collect_actors_to_create(get_actors_to_create()))
1129 }
1130 Command::RescheduleFragment {
1131 reschedules,
1132 fragment_actors,
1133 ..
1134 } => {
1135 let mut actor_upstreams = Self::collect_actor_upstreams(
1136 reschedules.iter().map(|(fragment_id, reschedule)| {
1137 (
1138 *fragment_id,
1139 reschedule.newly_created_actors.values().map(
1140 |((actor, dispatchers), _)| {
1141 (actor.actor_id, dispatchers.as_slice())
1142 },
1143 ),
1144 )
1145 }),
1146 Some((reschedules, fragment_actors)),
1147 graph_info,
1148 control_stream_manager,
1149 );
1150 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1151 for (fragment_id, (actor, dispatchers), worker_id) in
1152 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1153 reschedule
1154 .newly_created_actors
1155 .values()
1156 .map(|(actors, status)| (*fragment_id, actors, status))
1157 })
1158 {
1159 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1160 map.entry(*worker_id)
1161 .or_default()
1162 .entry(fragment_id)
1163 .or_insert_with(|| {
1164 let node = graph_info.fragment(fragment_id).nodes.clone();
1165 (node, vec![])
1166 })
1167 .1
1168 .push((actor.clone(), upstreams, dispatchers.clone()));
1169 }
1170 Some(map)
1171 }
1172 Command::ReplaceStreamJob(replace_table) => {
1173 let edges = edges.as_mut().expect("should exist");
1174 Some(edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create()))
1175 }
1176 _ => None,
1177 }
1178 }
1179
1180 fn generate_update_mutation_for_replace_table(
1181 old_fragments: &StreamJobFragments,
1182 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1183 dispatchers: FragmentActorDispatchers,
1184 init_split_assignment: &SplitAssignment,
1185 ) -> Option<Mutation> {
1186 let dropped_actors = old_fragments.actor_ids();
1187
1188 let actor_new_dispatchers = dispatchers
1189 .into_values()
1190 .flatten()
1191 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1192 .collect();
1193
1194 let actor_splits = init_split_assignment
1195 .values()
1196 .flat_map(build_actor_connector_splits)
1197 .collect();
1198
1199 Some(Mutation::Update(UpdateMutation {
1200 actor_new_dispatchers,
1201 merge_update: merge_updates.into_values().flatten().collect(),
1202 dropped_actors,
1203 actor_splits,
1204 ..Default::default()
1205 }))
1206 }
1207
1208 pub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1210 match self {
1211 Command::DropStreamingJobs {
1212 table_fragments_ids,
1213 ..
1214 } => Some(table_fragments_ids.iter().cloned()),
1215 _ => None,
1216 }
1217 .into_iter()
1218 .flatten()
1219 }
1220}
1221
1222impl Command {
1223 #[expect(clippy::type_complexity)]
1224 pub(super) fn collect_actor_upstreams(
1225 actor_dispatchers: impl Iterator<
1226 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1227 >,
1228 reschedule_dispatcher_update: Option<(
1229 &HashMap<FragmentId, Reschedule>,
1230 &HashMap<FragmentId, HashSet<ActorId>>,
1231 )>,
1232 graph_info: &InflightDatabaseInfo,
1233 control_stream_manager: &ControlStreamManager,
1234 ) -> HashMap<ActorId, ActorUpstreams> {
1235 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1236 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1237 let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1238 for (upstream_actor_id, dispatchers) in upstream_actors {
1239 let upstream_actor_location =
1240 upstream_fragment.actors[&upstream_actor_id].worker_id;
1241 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1242 for downstream_actor_id in dispatchers
1243 .iter()
1244 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1245 {
1246 actor_upstreams
1247 .entry(*downstream_actor_id)
1248 .or_default()
1249 .entry(upstream_fragment_id)
1250 .or_default()
1251 .insert(
1252 upstream_actor_id,
1253 ActorInfo {
1254 actor_id: upstream_actor_id,
1255 host: Some(upstream_actor_host.clone()),
1256 },
1257 );
1258 }
1259 }
1260 }
1261 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1262 for reschedule in reschedules.values() {
1263 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1264 let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1265 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1266 for upstream_actor_id in fragment_actors
1267 .get(upstream_fragment_id)
1268 .expect("should exist")
1269 {
1270 let upstream_actor_location =
1271 upstream_fragment.actors[upstream_actor_id].worker_id;
1272 let upstream_actor_host =
1273 control_stream_manager.host_addr(upstream_actor_location);
1274 if let Some(upstream_reschedule) = upstream_reschedule
1275 && upstream_reschedule
1276 .removed_actors
1277 .contains(upstream_actor_id)
1278 {
1279 continue;
1280 }
1281 for (_, downstream_actor_id) in
1282 reschedule
1283 .added_actors
1284 .iter()
1285 .flat_map(|(worker_id, actors)| {
1286 actors.iter().map(|actor| (*worker_id, *actor))
1287 })
1288 {
1289 actor_upstreams
1290 .entry(downstream_actor_id)
1291 .or_default()
1292 .entry(*upstream_fragment_id)
1293 .or_default()
1294 .insert(
1295 *upstream_actor_id,
1296 ActorInfo {
1297 actor_id: *upstream_actor_id,
1298 host: Some(upstream_actor_host.clone()),
1299 },
1300 );
1301 }
1302 }
1303 }
1304 }
1305 }
1306 actor_upstreams
1307 }
1308}