1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::ActorMapping;
22use risingwave_common::must_match;
23use risingwave_common::types::Timestamptz;
24use risingwave_common::util::epoch::Epoch;
25use risingwave_connector::source::SplitImpl;
26use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::catalog::{CreateType, Table};
29use risingwave_pb::common::ActorInfo;
30use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
31use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
32use risingwave_pb::stream_plan::barrier_mutation::Mutation;
33use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
34use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
35use risingwave_pb::stream_plan::update_mutation::*;
36use risingwave_pb::stream_plan::{
37 AddMutation, BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatcher,
38 Dispatchers, DropSubscriptionsMutation, PauseMutation, ResumeMutation,
39 SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
40 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
41};
42use risingwave_pb::stream_service::BarrierCompleteResponse;
43use tracing::warn;
44
45use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
46use crate::barrier::InflightSubscriptionInfo;
47use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
48use crate::barrier::edge_builder::FragmentEdgeBuildResult;
49use crate::barrier::info::BarrierInfo;
50use crate::barrier::rpc::ControlStreamManager;
51use crate::barrier::utils::collect_resp_info;
52use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
53use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
54use crate::manager::{StreamingJob, StreamingJobType};
55use crate::model::{
56 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
57 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
58 StreamJobFragments, StreamJobFragmentsToCreate,
59};
60use crate::stream::{
61 ConnectorPropsChange, FragmentBackfillOrder, JobReschedulePostUpdates, SplitAssignment,
62 ThrottleConfig, build_actor_connector_splits,
63};
64
65#[derive(Debug, Clone)]
68pub struct Reschedule {
69 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
71
72 pub removed_actors: HashSet<ActorId>,
74
75 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
77
78 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
80 pub upstream_dispatcher_mapping: Option<ActorMapping>,
85
86 pub downstream_fragment_ids: Vec<FragmentId>,
88
89 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
93
94 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
95}
96
97#[derive(Debug, Clone)]
104pub struct ReplaceStreamJobPlan {
105 pub old_fragments: StreamJobFragments,
106 pub new_fragments: StreamJobFragmentsToCreate,
107 pub replace_upstream: FragmentReplaceUpstream,
110 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
111 pub init_split_assignment: SplitAssignment,
117 pub streaming_job: StreamingJob,
119 pub tmp_id: u32,
121 pub to_drop_state_table_ids: Vec<TableId>,
123}
124
125impl ReplaceStreamJobPlan {
126 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
127 let mut fragment_changes = HashMap::new();
128 for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
129 let fragment_change =
130 CommandFragmentChanges::NewFragment(self.streaming_job.id().into(), new_fragment);
131 fragment_changes
132 .try_insert(fragment_id, fragment_change)
133 .expect("non-duplicate");
134 }
135 for fragment in self.old_fragments.fragments.values() {
136 fragment_changes
137 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
138 .expect("non-duplicate");
139 }
140 for (fragment_id, replace_map) in &self.replace_upstream {
141 fragment_changes
142 .try_insert(
143 *fragment_id,
144 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
145 )
146 .expect("non-duplicate");
147 }
148 fragment_changes
149 }
150
151 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
153 let mut fragment_replacements = HashMap::new();
154 for (upstream_fragment_id, new_upstream_fragment_id) in
155 self.replace_upstream.values().flatten()
156 {
157 {
158 let r =
159 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
160 if let Some(r) = r {
161 assert_eq!(
162 *new_upstream_fragment_id, r,
163 "one fragment is replaced by multiple fragments"
164 );
165 }
166 }
167 }
168 fragment_replacements
169 }
170}
171
172#[derive(educe::Educe, Clone)]
173#[educe(Debug)]
174pub struct CreateStreamingJobCommandInfo {
175 #[educe(Debug(ignore))]
176 pub stream_job_fragments: StreamJobFragmentsToCreate,
177 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
178 pub init_split_assignment: SplitAssignment,
179 pub definition: String,
180 pub job_type: StreamingJobType,
181 pub create_type: CreateType,
182 pub streaming_job: StreamingJob,
183 pub internal_tables: Vec<Table>,
184 pub fragment_backfill_ordering: FragmentBackfillOrder,
185}
186
187impl StreamJobFragments {
188 pub(super) fn new_fragment_info(
189 &self,
190 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
191 self.fragments.values().map(|fragment| {
192 (
193 fragment.fragment_id,
194 InflightFragmentInfo {
195 fragment_id: fragment.fragment_id,
196 distribution_type: fragment.distribution_type.into(),
197 nodes: fragment.nodes.clone(),
198 actors: fragment
199 .actors
200 .iter()
201 .map(|actor| {
202 (
203 actor.actor_id,
204 InflightActorInfo {
205 worker_id: self
206 .actor_status
207 .get(&actor.actor_id)
208 .expect("should exist")
209 .worker_id()
210 as WorkerId,
211 vnode_bitmap: actor.vnode_bitmap.clone(),
212 },
213 )
214 })
215 .collect(),
216 state_table_ids: fragment
217 .state_table_ids
218 .iter()
219 .map(|table_id| TableId::new(*table_id))
220 .collect(),
221 },
222 )
223 })
224 }
225}
226
227#[derive(Debug, Clone)]
228pub struct SnapshotBackfillInfo {
229 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
233}
234
235#[derive(Debug, Clone)]
236pub enum CreateStreamingJobType {
237 Normal,
238 SinkIntoTable(ReplaceStreamJobPlan),
239 SnapshotBackfill(SnapshotBackfillInfo),
240}
241
242#[derive(Debug, strum::Display)]
246pub enum Command {
247 Flush,
250
251 Pause,
254
255 Resume,
259
260 DropStreamingJobs {
268 table_fragments_ids: HashSet<TableId>,
269 actors: Vec<ActorId>,
270 unregistered_state_table_ids: HashSet<TableId>,
271 unregistered_fragment_ids: HashSet<FragmentId>,
272 },
273
274 CreateStreamingJob {
284 info: CreateStreamingJobCommandInfo,
285 job_type: CreateStreamingJobType,
286 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
287 },
288 MergeSnapshotBackfillStreamingJobs(
289 HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
290 ),
291
292 RescheduleFragment {
298 reschedules: HashMap<FragmentId, Reschedule>,
299 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
301 post_updates: JobReschedulePostUpdates,
303 },
304
305 ReplaceStreamJob(ReplaceStreamJobPlan),
312
313 SourceChangeSplit(SplitAssignment),
316
317 Throttle(ThrottleConfig),
320
321 CreateSubscription {
324 subscription_id: u32,
325 upstream_mv_table_id: TableId,
326 retention_second: u64,
327 },
328
329 DropSubscription {
333 subscription_id: u32,
334 upstream_mv_table_id: TableId,
335 },
336
337 ConnectorPropsChange(ConnectorPropsChange),
338
339 StartFragmentBackfill {
341 fragment_ids: Vec<FragmentId>,
342 },
343}
344
345impl Command {
346 pub fn pause() -> Self {
347 Self::Pause
348 }
349
350 pub fn resume() -> Self {
351 Self::Resume
352 }
353
354 pub fn cancel(table_fragments: &StreamJobFragments) -> Self {
355 Self::DropStreamingJobs {
356 table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
357 actors: table_fragments.actor_ids(),
358 unregistered_state_table_ids: table_fragments
359 .all_table_ids()
360 .map(TableId::new)
361 .collect(),
362 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
363 }
364 }
365
366 pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
367 match self {
368 Command::Flush => None,
369 Command::Pause => None,
370 Command::Resume => None,
371 Command::DropStreamingJobs {
372 unregistered_fragment_ids,
373 ..
374 } => Some(
375 unregistered_fragment_ids
376 .iter()
377 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
378 .collect(),
379 ),
380 Command::CreateStreamingJob { info, job_type, .. } => {
381 assert!(
382 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
383 "should handle fragment changes separately for snapshot backfill"
384 );
385 let mut changes: HashMap<_, _> = info
386 .stream_job_fragments
387 .new_fragment_info()
388 .map(|(fragment_id, fragment_info)| {
389 (
390 fragment_id,
391 CommandFragmentChanges::NewFragment(
392 info.streaming_job.id().into(),
393 fragment_info,
394 ),
395 )
396 })
397 .collect();
398
399 if let CreateStreamingJobType::SinkIntoTable(plan) = job_type {
400 let extra_change = plan.fragment_changes();
401 changes.extend(extra_change);
402 }
403
404 Some(changes)
405 }
406 Command::RescheduleFragment { reschedules, .. } => Some(
407 reschedules
408 .iter()
409 .map(|(fragment_id, reschedule)| {
410 (
411 *fragment_id,
412 CommandFragmentChanges::Reschedule {
413 new_actors: reschedule
414 .added_actors
415 .iter()
416 .flat_map(|(node_id, actors)| {
417 actors.iter().map(|actor_id| {
418 (
419 *actor_id,
420 InflightActorInfo {
421 worker_id: *node_id,
422 vnode_bitmap: reschedule
423 .newly_created_actors
424 .get(actor_id)
425 .expect("should exist")
426 .0
427 .0
428 .vnode_bitmap
429 .clone(),
430 },
431 )
432 })
433 })
434 .collect(),
435 actor_update_vnode_bitmap: reschedule
436 .vnode_bitmap_updates
437 .iter()
438 .filter(|(actor_id, _)| {
439 !reschedule.newly_created_actors.contains_key(actor_id)
441 })
442 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
443 .collect(),
444 to_remove: reschedule.removed_actors.iter().cloned().collect(),
445 },
446 )
447 })
448 .collect(),
449 ),
450 Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
451 Command::MergeSnapshotBackfillStreamingJobs(_) => None,
452 Command::SourceChangeSplit(_) => None,
453 Command::Throttle(_) => None,
454 Command::CreateSubscription { .. } => None,
455 Command::DropSubscription { .. } => None,
456 Command::ConnectorPropsChange(_) => None,
457 Command::StartFragmentBackfill { .. } => None,
458 }
459 }
460
461 pub fn need_checkpoint(&self) -> bool {
462 !matches!(self, Command::Resume)
464 }
465}
466
467#[derive(Debug, Clone)]
468pub enum BarrierKind {
469 Initial,
470 Barrier,
471 Checkpoint(Vec<u64>),
473}
474
475impl BarrierKind {
476 pub fn to_protobuf(&self) -> PbBarrierKind {
477 match self {
478 BarrierKind::Initial => PbBarrierKind::Initial,
479 BarrierKind::Barrier => PbBarrierKind::Barrier,
480 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
481 }
482 }
483
484 pub fn is_checkpoint(&self) -> bool {
485 matches!(self, BarrierKind::Checkpoint(_))
486 }
487
488 pub fn is_initial(&self) -> bool {
489 matches!(self, BarrierKind::Initial)
490 }
491
492 pub fn as_str_name(&self) -> &'static str {
493 match self {
494 BarrierKind::Initial => "Initial",
495 BarrierKind::Barrier => "Barrier",
496 BarrierKind::Checkpoint(_) => "Checkpoint",
497 }
498 }
499}
500
501pub(super) struct CommandContext {
504 subscription_info: InflightSubscriptionInfo,
505
506 pub(super) barrier_info: BarrierInfo,
507
508 pub(super) table_ids_to_commit: HashSet<TableId>,
509
510 pub(super) command: Option<Command>,
511
512 _span: tracing::Span,
518}
519
520impl std::fmt::Debug for CommandContext {
521 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
522 f.debug_struct("CommandContext")
523 .field("barrier_info", &self.barrier_info)
524 .field("command", &self.command)
525 .finish()
526 }
527}
528
529impl CommandContext {
530 pub(super) fn new(
531 barrier_info: BarrierInfo,
532 subscription_info: InflightSubscriptionInfo,
533 table_ids_to_commit: HashSet<TableId>,
534 command: Option<Command>,
535 span: tracing::Span,
536 ) -> Self {
537 Self {
538 subscription_info,
539 barrier_info,
540 table_ids_to_commit,
541 command,
542 _span: span,
543 }
544 }
545
546 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
547 let Some(truncate_timestamptz) = Timestamptz::from_secs(
548 self.barrier_info
549 .prev_epoch
550 .value()
551 .as_timestamptz()
552 .timestamp()
553 - retention_second as i64,
554 ) else {
555 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
556 return self.barrier_info.prev_epoch.value();
557 };
558 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
559 }
560
561 pub(super) fn collect_commit_epoch_info(
562 &self,
563 info: &mut CommitEpochInfo,
564 resps: Vec<BarrierCompleteResponse>,
565 backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
566 ) {
567 let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) =
568 collect_resp_info(resps);
569
570 let new_table_fragment_infos =
571 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
572 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
573 {
574 let table_fragments = &info.stream_job_fragments;
575 let mut table_ids: HashSet<_> = table_fragments
576 .internal_table_ids()
577 .into_iter()
578 .map(TableId::new)
579 .collect();
580 if let Some(mv_table_id) = table_fragments.mv_table_id() {
581 table_ids.insert(TableId::new(mv_table_id));
582 }
583
584 vec![NewTableFragmentInfo { table_ids }]
585 } else {
586 vec![]
587 };
588
589 let mut mv_log_store_truncate_epoch = HashMap::new();
590 let mut update_truncate_epoch =
592 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
593 .entry(table_id.table_id)
594 {
595 Entry::Occupied(mut entry) => {
596 let prev_truncate_epoch = entry.get_mut();
597 if truncate_epoch < *prev_truncate_epoch {
598 *prev_truncate_epoch = truncate_epoch;
599 }
600 }
601 Entry::Vacant(entry) => {
602 entry.insert(truncate_epoch);
603 }
604 };
605 for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
606 if let Some(truncate_epoch) = subscriptions
607 .values()
608 .max()
609 .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
610 {
611 update_truncate_epoch(*mv_table_id, truncate_epoch);
612 }
613 }
614 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
615 for mv_table_id in upstream_mv_table_ids {
616 update_truncate_epoch(mv_table_id, backfill_epoch);
617 }
618 }
619
620 let table_new_change_log = build_table_change_log_delta(
621 old_value_ssts.into_iter(),
622 synced_ssts.iter().map(|sst| &sst.sst_info),
623 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
624 mv_log_store_truncate_epoch.into_iter(),
625 );
626
627 let epoch = self.barrier_info.prev_epoch();
628 for table_id in &self.table_ids_to_commit {
629 info.tables_to_commit
630 .try_insert(*table_id, epoch)
631 .expect("non duplicate");
632 }
633
634 info.sstables.extend(synced_ssts);
635 info.new_table_watermarks.extend(new_table_watermarks);
636 info.sst_to_context.extend(sst_to_context);
637 info.new_table_fragment_infos
638 .extend(new_table_fragment_infos);
639 info.change_log_delta.extend(table_new_change_log);
640 }
641}
642
643impl Command {
644 pub(super) fn to_mutation(
648 &self,
649 is_currently_paused: bool,
650 edges: &mut Option<FragmentEdgeBuildResult>,
651 control_stream_manager: &ControlStreamManager,
652 ) -> Option<Mutation> {
653 match self {
654 Command::Flush => None,
655
656 Command::Pause => {
657 if !is_currently_paused {
660 Some(Mutation::Pause(PauseMutation {}))
661 } else {
662 None
663 }
664 }
665
666 Command::Resume => {
667 if is_currently_paused {
669 Some(Mutation::Resume(ResumeMutation {}))
670 } else {
671 None
672 }
673 }
674
675 Command::SourceChangeSplit(change) => {
676 let mut diff = HashMap::new();
677
678 for actor_splits in change.values() {
679 diff.extend(actor_splits.clone());
680 }
681
682 Some(Mutation::Splits(SourceChangeSplitMutation {
683 actor_splits: build_actor_connector_splits(&diff),
684 }))
685 }
686
687 Command::Throttle(config) => {
688 let mut actor_to_apply = HashMap::new();
689 for per_fragment in config.values() {
690 actor_to_apply.extend(
691 per_fragment
692 .iter()
693 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
694 );
695 }
696
697 Some(Mutation::Throttle(ThrottleMutation {
698 actor_throttle: actor_to_apply,
699 }))
700 }
701
702 Command::DropStreamingJobs { actors, .. } => Some(Mutation::Stop(StopMutation {
703 actors: actors.clone(),
704 })),
705
706 Command::CreateStreamingJob {
707 info:
708 CreateStreamingJobCommandInfo {
709 stream_job_fragments: table_fragments,
710 init_split_assignment: split_assignment,
711 upstream_fragment_downstreams,
712 fragment_backfill_ordering,
713 ..
714 },
715 job_type,
716 ..
717 } => {
718 let edges = edges.as_mut().expect("should exist");
719 let added_actors = table_fragments.actor_ids();
720 let actor_splits = split_assignment
721 .values()
722 .flat_map(build_actor_connector_splits)
723 .collect();
724 let subscriptions_to_add =
725 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
726 job_type
727 {
728 snapshot_backfill_info
729 .upstream_mv_table_id_to_backfill_epoch
730 .keys()
731 .map(|table_id| SubscriptionUpstreamInfo {
732 subscriber_id: table_fragments.stream_job_id().table_id,
733 upstream_mv_table_id: table_id.table_id,
734 })
735 .collect()
736 } else {
737 Default::default()
738 };
739 let backfill_nodes_to_pause: Vec<_> =
740 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
741 .into_iter()
742 .collect();
743 let add = Some(Mutation::Add(AddMutation {
744 actor_dispatchers: edges
745 .dispatchers
746 .extract_if(|fragment_id, _| {
747 upstream_fragment_downstreams.contains_key(fragment_id)
748 })
749 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
750 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
751 .collect(),
752 added_actors,
753 actor_splits,
754 pause: is_currently_paused,
756 subscriptions_to_add,
757 backfill_nodes_to_pause,
758 }));
759
760 if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
761 old_fragments,
762 init_split_assignment,
763 replace_upstream,
764 upstream_fragment_downstreams,
765 ..
766 }) = job_type
767 {
768 let merge_updates = edges
769 .merge_updates
770 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
771 .collect();
772 let dispatchers = edges
773 .dispatchers
774 .extract_if(|fragment_id, _| {
775 upstream_fragment_downstreams.contains_key(fragment_id)
776 })
777 .collect();
778 let update = Self::generate_update_mutation_for_replace_table(
779 old_fragments,
780 merge_updates,
781 dispatchers,
782 init_split_assignment,
783 );
784
785 Some(Mutation::Combined(CombinedMutation {
786 mutations: vec![
787 BarrierMutation { mutation: add },
788 BarrierMutation { mutation: update },
789 ],
790 }))
791 } else {
792 add
793 }
794 }
795 Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
796 Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
797 info: jobs_to_merge
798 .iter()
799 .flat_map(|(table_id, (backfill_upstream_tables, _))| {
800 backfill_upstream_tables
801 .iter()
802 .map(move |upstream_table_id| SubscriptionUpstreamInfo {
803 subscriber_id: table_id.table_id,
804 upstream_mv_table_id: upstream_table_id.table_id,
805 })
806 })
807 .collect(),
808 }))
809 }
810
811 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
812 old_fragments,
813 replace_upstream,
814 upstream_fragment_downstreams,
815 init_split_assignment,
816 ..
817 }) => {
818 let edges = edges.as_mut().expect("should exist");
819 let merge_updates = edges
820 .merge_updates
821 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
822 .collect();
823 let dispatchers = edges
824 .dispatchers
825 .extract_if(|fragment_id, _| {
826 upstream_fragment_downstreams.contains_key(fragment_id)
827 })
828 .collect();
829 Self::generate_update_mutation_for_replace_table(
830 old_fragments,
831 merge_updates,
832 dispatchers,
833 init_split_assignment,
834 )
835 }
836
837 Command::RescheduleFragment {
838 reschedules,
839 fragment_actors,
840 ..
841 } => {
842 let mut dispatcher_update = HashMap::new();
843 for reschedule in reschedules.values() {
844 for &(upstream_fragment_id, dispatcher_id) in
845 &reschedule.upstream_fragment_dispatcher_ids
846 {
847 let upstream_actor_ids = fragment_actors
849 .get(&upstream_fragment_id)
850 .expect("should contain");
851
852 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
853
854 for &actor_id in upstream_actor_ids {
856 let added_downstream_actor_id = if upstream_reschedule
857 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
858 .unwrap_or(true)
859 {
860 reschedule
861 .added_actors
862 .values()
863 .flatten()
864 .cloned()
865 .collect()
866 } else {
867 Default::default()
868 };
869 dispatcher_update
871 .try_insert(
872 (actor_id, dispatcher_id),
873 DispatcherUpdate {
874 actor_id,
875 dispatcher_id,
876 hash_mapping: reschedule
877 .upstream_dispatcher_mapping
878 .as_ref()
879 .map(|m| m.to_protobuf()),
880 added_downstream_actor_id,
881 removed_downstream_actor_id: reschedule
882 .removed_actors
883 .iter()
884 .cloned()
885 .collect(),
886 },
887 )
888 .unwrap();
889 }
890 }
891 }
892 let dispatcher_update = dispatcher_update.into_values().collect();
893
894 let mut merge_update = HashMap::new();
895 for (&fragment_id, reschedule) in reschedules {
896 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
897 let downstream_actor_ids = fragment_actors
899 .get(&downstream_fragment_id)
900 .expect("should contain");
901
902 let downstream_removed_actors: HashSet<_> = reschedules
906 .get(&downstream_fragment_id)
907 .map(|downstream_reschedule| {
908 downstream_reschedule
909 .removed_actors
910 .iter()
911 .copied()
912 .collect()
913 })
914 .unwrap_or_default();
915
916 for &actor_id in downstream_actor_ids {
918 if downstream_removed_actors.contains(&actor_id) {
919 continue;
920 }
921
922 merge_update
924 .try_insert(
925 (actor_id, fragment_id),
926 MergeUpdate {
927 actor_id,
928 upstream_fragment_id: fragment_id,
929 new_upstream_fragment_id: None,
930 added_upstream_actors: reschedule
931 .added_actors
932 .iter()
933 .flat_map(|(worker_id, actors)| {
934 let host =
935 control_stream_manager.host_addr(*worker_id);
936 actors.iter().map(move |actor_id| ActorInfo {
937 actor_id: *actor_id,
938 host: Some(host.clone()),
939 })
940 })
941 .collect(),
942 removed_upstream_actor_id: reschedule
943 .removed_actors
944 .iter()
945 .cloned()
946 .collect(),
947 },
948 )
949 .unwrap();
950 }
951 }
952 }
953 let merge_update = merge_update.into_values().collect();
954
955 let mut actor_vnode_bitmap_update = HashMap::new();
956 for reschedule in reschedules.values() {
957 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
959 let bitmap = bitmap.to_protobuf();
960 actor_vnode_bitmap_update
961 .try_insert(actor_id, bitmap)
962 .unwrap();
963 }
964 }
965 let dropped_actors = reschedules
966 .values()
967 .flat_map(|r| r.removed_actors.iter().copied())
968 .collect();
969 let mut actor_splits = HashMap::new();
970
971 for reschedule in reschedules.values() {
972 for (actor_id, splits) in &reschedule.actor_splits {
973 actor_splits.insert(
974 *actor_id as ActorId,
975 ConnectorSplits {
976 splits: splits.iter().map(ConnectorSplit::from).collect(),
977 },
978 );
979 }
980 }
981
982 let actor_new_dispatchers = HashMap::new();
984
985 let mutation = Mutation::Update(UpdateMutation {
986 dispatcher_update,
987 merge_update,
988 actor_vnode_bitmap_update,
989 dropped_actors,
990 actor_splits,
991 actor_new_dispatchers,
992 });
993 tracing::debug!("update mutation: {mutation:?}");
994 Some(mutation)
995 }
996
997 Command::CreateSubscription {
998 upstream_mv_table_id,
999 subscription_id,
1000 ..
1001 } => Some(Mutation::Add(AddMutation {
1002 actor_dispatchers: Default::default(),
1003 added_actors: vec![],
1004 actor_splits: Default::default(),
1005 pause: false,
1006 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1007 upstream_mv_table_id: upstream_mv_table_id.table_id,
1008 subscriber_id: *subscription_id,
1009 }],
1010 backfill_nodes_to_pause: vec![],
1011 })),
1012 Command::DropSubscription {
1013 upstream_mv_table_id,
1014 subscription_id,
1015 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1016 info: vec![SubscriptionUpstreamInfo {
1017 subscriber_id: *subscription_id,
1018 upstream_mv_table_id: upstream_mv_table_id.table_id,
1019 }],
1020 })),
1021 Command::ConnectorPropsChange(config) => {
1022 let mut connector_props_infos = HashMap::default();
1023 for (k, v) in config {
1024 connector_props_infos.insert(
1025 *k,
1026 ConnectorPropsInfo {
1027 connector_props_info: v.clone(),
1028 },
1029 );
1030 }
1031 Some(Mutation::ConnectorPropsChange(
1032 ConnectorPropsChangeMutation {
1033 connector_props_infos,
1034 },
1035 ))
1036 }
1037 Command::StartFragmentBackfill { fragment_ids } => Some(
1038 Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1039 fragment_ids: fragment_ids.clone(),
1040 }),
1041 ),
1042 }
1043 }
1044
1045 pub(super) fn actors_to_create(
1046 &self,
1047 graph_info: &InflightDatabaseInfo,
1048 edges: &mut Option<FragmentEdgeBuildResult>,
1049 control_stream_manager: &ControlStreamManager,
1050 ) -> Option<StreamJobActorsToCreate> {
1051 match self {
1052 Command::CreateStreamingJob { info, job_type, .. } => {
1053 let sink_into_table_replace_plan = match job_type {
1054 CreateStreamingJobType::Normal => None,
1055 CreateStreamingJobType::SinkIntoTable(replace_table) => Some(replace_table),
1056 CreateStreamingJobType::SnapshotBackfill(_) => {
1057 return None;
1059 }
1060 };
1061 let get_actors_to_create = || {
1062 sink_into_table_replace_plan
1063 .map(|plan| plan.new_fragments.actors_to_create())
1064 .into_iter()
1065 .flatten()
1066 .chain(info.stream_job_fragments.actors_to_create())
1067 };
1068 let edges = edges.as_mut().expect("should exist");
1069 Some(edges.collect_actors_to_create(get_actors_to_create()))
1070 }
1071 Command::RescheduleFragment {
1072 reschedules,
1073 fragment_actors,
1074 ..
1075 } => {
1076 let mut actor_upstreams = Self::collect_actor_upstreams(
1077 reschedules.iter().map(|(fragment_id, reschedule)| {
1078 (
1079 *fragment_id,
1080 reschedule.newly_created_actors.values().map(
1081 |((actor, dispatchers), _)| {
1082 (actor.actor_id, dispatchers.as_slice())
1083 },
1084 ),
1085 )
1086 }),
1087 Some((reschedules, fragment_actors)),
1088 graph_info,
1089 control_stream_manager,
1090 );
1091 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1092 for (fragment_id, (actor, dispatchers), worker_id) in
1093 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1094 reschedule
1095 .newly_created_actors
1096 .values()
1097 .map(|(actors, status)| (*fragment_id, actors, status))
1098 })
1099 {
1100 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1101 map.entry(*worker_id)
1102 .or_default()
1103 .entry(fragment_id)
1104 .or_insert_with(|| {
1105 let node = graph_info.fragment(fragment_id).nodes.clone();
1106 (node, vec![])
1107 })
1108 .1
1109 .push((actor.clone(), upstreams, dispatchers.clone()));
1110 }
1111 Some(map)
1112 }
1113 Command::ReplaceStreamJob(replace_table) => {
1114 let edges = edges.as_mut().expect("should exist");
1115 Some(edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create()))
1116 }
1117 _ => None,
1118 }
1119 }
1120
1121 fn generate_update_mutation_for_replace_table(
1122 old_fragments: &StreamJobFragments,
1123 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1124 dispatchers: FragmentActorDispatchers,
1125 init_split_assignment: &SplitAssignment,
1126 ) -> Option<Mutation> {
1127 let dropped_actors = old_fragments.actor_ids();
1128
1129 let actor_new_dispatchers = dispatchers
1130 .into_values()
1131 .flatten()
1132 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1133 .collect();
1134
1135 let actor_splits = init_split_assignment
1136 .values()
1137 .flat_map(build_actor_connector_splits)
1138 .collect();
1139
1140 Some(Mutation::Update(UpdateMutation {
1141 actor_new_dispatchers,
1142 merge_update: merge_updates.into_values().flatten().collect(),
1143 dropped_actors,
1144 actor_splits,
1145 ..Default::default()
1146 }))
1147 }
1148
1149 pub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1151 match self {
1152 Command::DropStreamingJobs {
1153 table_fragments_ids,
1154 ..
1155 } => Some(table_fragments_ids.iter().cloned()),
1156 _ => None,
1157 }
1158 .into_iter()
1159 .flatten()
1160 }
1161}
1162
1163impl Command {
1164 #[expect(clippy::type_complexity)]
1165 pub(super) fn collect_actor_upstreams(
1166 actor_dispatchers: impl Iterator<
1167 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1168 >,
1169 reschedule_dispatcher_update: Option<(
1170 &HashMap<FragmentId, Reschedule>,
1171 &HashMap<FragmentId, HashSet<ActorId>>,
1172 )>,
1173 graph_info: &InflightDatabaseInfo,
1174 control_stream_manager: &ControlStreamManager,
1175 ) -> HashMap<ActorId, ActorUpstreams> {
1176 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1177 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1178 let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1179 for (upstream_actor_id, dispatchers) in upstream_actors {
1180 let upstream_actor_location =
1181 upstream_fragment.actors[&upstream_actor_id].worker_id;
1182 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1183 for downstream_actor_id in dispatchers
1184 .iter()
1185 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1186 {
1187 actor_upstreams
1188 .entry(*downstream_actor_id)
1189 .or_default()
1190 .entry(upstream_fragment_id)
1191 .or_default()
1192 .insert(
1193 upstream_actor_id,
1194 ActorInfo {
1195 actor_id: upstream_actor_id,
1196 host: Some(upstream_actor_host.clone()),
1197 },
1198 );
1199 }
1200 }
1201 }
1202 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1203 for reschedule in reschedules.values() {
1204 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1205 let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1206 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1207 for upstream_actor_id in fragment_actors
1208 .get(upstream_fragment_id)
1209 .expect("should exist")
1210 {
1211 let upstream_actor_location =
1212 upstream_fragment.actors[upstream_actor_id].worker_id;
1213 let upstream_actor_host =
1214 control_stream_manager.host_addr(upstream_actor_location);
1215 if let Some(upstream_reschedule) = upstream_reschedule
1216 && upstream_reschedule
1217 .removed_actors
1218 .contains(upstream_actor_id)
1219 {
1220 continue;
1221 }
1222 for (_, downstream_actor_id) in
1223 reschedule
1224 .added_actors
1225 .iter()
1226 .flat_map(|(worker_id, actors)| {
1227 actors.iter().map(|actor| (*worker_id, *actor))
1228 })
1229 {
1230 actor_upstreams
1231 .entry(downstream_actor_id)
1232 .or_default()
1233 .entry(*upstream_fragment_id)
1234 .or_default()
1235 .insert(
1236 *upstream_actor_id,
1237 ActorInfo {
1238 actor_id: *upstream_actor_id,
1239 host: Some(upstream_actor_host.clone()),
1240 },
1241 );
1242 }
1243 }
1244 }
1245 }
1246 }
1247 actor_upstreams
1248 }
1249}