1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::ActorMapping;
22use risingwave_common::must_match;
23use risingwave_common::types::Timestamptz;
24use risingwave_common::util::epoch::Epoch;
25use risingwave_connector::source::SplitImpl;
26use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::catalog::{CreateType, Table};
29use risingwave_pb::common::ActorInfo;
30use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
31use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
32use risingwave_pb::stream_plan::barrier_mutation::Mutation;
33use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
34use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
35use risingwave_pb::stream_plan::update_mutation::*;
36use risingwave_pb::stream_plan::{
37 AddMutation, BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatcher,
38 Dispatchers, DropSubscriptionsMutation, PauseMutation, ResumeMutation,
39 SourceChangeSplitMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation,
40 UpdateMutation,
41};
42use risingwave_pb::stream_service::BarrierCompleteResponse;
43use tracing::warn;
44
45use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
46use crate::barrier::InflightSubscriptionInfo;
47use crate::barrier::edge_builder::FragmentEdgeBuildResult;
48use crate::barrier::info::BarrierInfo;
49use crate::barrier::rpc::ControlStreamManager;
50use crate::barrier::utils::collect_resp_info;
51use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
52use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
53use crate::manager::{StreamingJob, StreamingJobType};
54use crate::model::{
55 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
56 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
57 StreamJobFragments, StreamJobFragmentsToCreate,
58};
59use crate::stream::{
60 ConnectorPropsChange, JobReschedulePostUpdates, SplitAssignment, ThrottleConfig,
61 build_actor_connector_splits,
62};
63
64#[derive(Debug, Clone)]
67pub struct Reschedule {
68 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
70
71 pub removed_actors: HashSet<ActorId>,
73
74 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
76
77 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
79 pub upstream_dispatcher_mapping: Option<ActorMapping>,
84
85 pub downstream_fragment_ids: Vec<FragmentId>,
87
88 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
92
93 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
94}
95
96#[derive(Debug, Clone)]
103pub struct ReplaceStreamJobPlan {
104 pub old_fragments: StreamJobFragments,
105 pub new_fragments: StreamJobFragmentsToCreate,
106 pub replace_upstream: FragmentReplaceUpstream,
109 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
110 pub init_split_assignment: SplitAssignment,
116 pub streaming_job: StreamingJob,
118 pub tmp_id: u32,
120 pub to_drop_state_table_ids: Vec<TableId>,
122}
123
124impl ReplaceStreamJobPlan {
125 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
126 let mut fragment_changes = HashMap::new();
127 for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
128 let fragment_change =
129 CommandFragmentChanges::NewFragment(self.streaming_job.id().into(), new_fragment);
130 fragment_changes
131 .try_insert(fragment_id, fragment_change)
132 .expect("non-duplicate");
133 }
134 for fragment in self.old_fragments.fragments.values() {
135 fragment_changes
136 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
137 .expect("non-duplicate");
138 }
139 for (fragment_id, replace_map) in &self.replace_upstream {
140 fragment_changes
141 .try_insert(
142 *fragment_id,
143 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
144 )
145 .expect("non-duplicate");
146 }
147 fragment_changes
148 }
149
150 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
152 let mut fragment_replacements = HashMap::new();
153 for (upstream_fragment_id, new_upstream_fragment_id) in
154 self.replace_upstream.values().flatten()
155 {
156 {
157 let r =
158 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
159 if let Some(r) = r {
160 assert_eq!(
161 *new_upstream_fragment_id, r,
162 "one fragment is replaced by multiple fragments"
163 );
164 }
165 }
166 }
167 fragment_replacements
168 }
169}
170
171#[derive(educe::Educe, Clone)]
172#[educe(Debug)]
173pub struct CreateStreamingJobCommandInfo {
174 #[educe(Debug(ignore))]
175 pub stream_job_fragments: StreamJobFragmentsToCreate,
176 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
177 pub init_split_assignment: SplitAssignment,
178 pub definition: String,
179 pub job_type: StreamingJobType,
180 pub create_type: CreateType,
181 pub streaming_job: StreamingJob,
182 pub internal_tables: Vec<Table>,
183}
184
185impl StreamJobFragments {
186 pub(super) fn new_fragment_info(
187 &self,
188 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
189 self.fragments.values().map(|fragment| {
190 (
191 fragment.fragment_id,
192 InflightFragmentInfo {
193 fragment_id: fragment.fragment_id,
194 distribution_type: fragment.distribution_type.into(),
195 nodes: fragment.nodes.clone(),
196 actors: fragment
197 .actors
198 .iter()
199 .map(|actor| {
200 (
201 actor.actor_id,
202 InflightActorInfo {
203 worker_id: self
204 .actor_status
205 .get(&actor.actor_id)
206 .expect("should exist")
207 .worker_id()
208 as WorkerId,
209 vnode_bitmap: actor.vnode_bitmap.clone(),
210 },
211 )
212 })
213 .collect(),
214 state_table_ids: fragment
215 .state_table_ids
216 .iter()
217 .map(|table_id| TableId::new(*table_id))
218 .collect(),
219 },
220 )
221 })
222 }
223}
224
225#[derive(Debug, Clone)]
226pub struct SnapshotBackfillInfo {
227 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
231}
232
233#[derive(Debug, Clone)]
234pub enum CreateStreamingJobType {
235 Normal,
236 SinkIntoTable(ReplaceStreamJobPlan),
237 SnapshotBackfill(SnapshotBackfillInfo),
238}
239
240#[derive(Debug, strum::Display)]
244pub enum Command {
245 Flush,
248
249 Pause,
252
253 Resume,
257
258 DropStreamingJobs {
266 table_fragments_ids: HashSet<TableId>,
267 actors: Vec<ActorId>,
268 unregistered_state_table_ids: HashSet<TableId>,
269 unregistered_fragment_ids: HashSet<FragmentId>,
270 },
271
272 CreateStreamingJob {
282 info: CreateStreamingJobCommandInfo,
283 job_type: CreateStreamingJobType,
284 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
285 },
286 MergeSnapshotBackfillStreamingJobs(
287 HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
288 ),
289
290 RescheduleFragment {
296 reschedules: HashMap<FragmentId, Reschedule>,
297 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
299 post_updates: JobReschedulePostUpdates,
301 },
302
303 ReplaceStreamJob(ReplaceStreamJobPlan),
310
311 SourceChangeSplit(SplitAssignment),
314
315 Throttle(ThrottleConfig),
318
319 CreateSubscription {
322 subscription_id: u32,
323 upstream_mv_table_id: TableId,
324 retention_second: u64,
325 },
326
327 DropSubscription {
331 subscription_id: u32,
332 upstream_mv_table_id: TableId,
333 },
334
335 ConnectorPropsChange(ConnectorPropsChange),
336}
337
338impl Command {
339 pub fn pause() -> Self {
340 Self::Pause
341 }
342
343 pub fn resume() -> Self {
344 Self::Resume
345 }
346
347 pub fn cancel(table_fragments: &StreamJobFragments) -> Self {
348 Self::DropStreamingJobs {
349 table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
350 actors: table_fragments.actor_ids(),
351 unregistered_state_table_ids: table_fragments
352 .all_table_ids()
353 .map(TableId::new)
354 .collect(),
355 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
356 }
357 }
358
359 pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
360 match self {
361 Command::Flush => None,
362 Command::Pause => None,
363 Command::Resume => None,
364 Command::DropStreamingJobs {
365 unregistered_fragment_ids,
366 ..
367 } => Some(
368 unregistered_fragment_ids
369 .iter()
370 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
371 .collect(),
372 ),
373 Command::CreateStreamingJob { info, job_type, .. } => {
374 assert!(
375 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
376 "should handle fragment changes separately for snapshot backfill"
377 );
378 let mut changes: HashMap<_, _> = info
379 .stream_job_fragments
380 .new_fragment_info()
381 .map(|(fragment_id, fragment_info)| {
382 (
383 fragment_id,
384 CommandFragmentChanges::NewFragment(
385 info.streaming_job.id().into(),
386 fragment_info,
387 ),
388 )
389 })
390 .collect();
391
392 if let CreateStreamingJobType::SinkIntoTable(plan) = job_type {
393 let extra_change = plan.fragment_changes();
394 changes.extend(extra_change);
395 }
396
397 Some(changes)
398 }
399 Command::RescheduleFragment { reschedules, .. } => Some(
400 reschedules
401 .iter()
402 .map(|(fragment_id, reschedule)| {
403 (
404 *fragment_id,
405 CommandFragmentChanges::Reschedule {
406 new_actors: reschedule
407 .added_actors
408 .iter()
409 .flat_map(|(node_id, actors)| {
410 actors.iter().map(|actor_id| {
411 (
412 *actor_id,
413 InflightActorInfo {
414 worker_id: *node_id,
415 vnode_bitmap: reschedule
416 .newly_created_actors
417 .get(actor_id)
418 .expect("should exist")
419 .0
420 .0
421 .vnode_bitmap
422 .clone(),
423 },
424 )
425 })
426 })
427 .collect(),
428 actor_update_vnode_bitmap: reschedule
429 .vnode_bitmap_updates
430 .iter()
431 .filter(|(actor_id, _)| {
432 !reschedule.newly_created_actors.contains_key(actor_id)
434 })
435 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
436 .collect(),
437 to_remove: reschedule.removed_actors.iter().cloned().collect(),
438 },
439 )
440 })
441 .collect(),
442 ),
443 Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
444 Command::MergeSnapshotBackfillStreamingJobs(_) => None,
445 Command::SourceChangeSplit(_) => None,
446 Command::Throttle(_) => None,
447 Command::CreateSubscription { .. } => None,
448 Command::DropSubscription { .. } => None,
449 Command::ConnectorPropsChange(_) => None,
450 }
451 }
452
453 pub fn need_checkpoint(&self) -> bool {
454 !matches!(self, Command::Resume)
456 }
457}
458
459#[derive(Debug, Clone)]
460pub enum BarrierKind {
461 Initial,
462 Barrier,
463 Checkpoint(Vec<u64>),
465}
466
467impl BarrierKind {
468 pub fn to_protobuf(&self) -> PbBarrierKind {
469 match self {
470 BarrierKind::Initial => PbBarrierKind::Initial,
471 BarrierKind::Barrier => PbBarrierKind::Barrier,
472 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
473 }
474 }
475
476 pub fn is_checkpoint(&self) -> bool {
477 matches!(self, BarrierKind::Checkpoint(_))
478 }
479
480 pub fn is_initial(&self) -> bool {
481 matches!(self, BarrierKind::Initial)
482 }
483
484 pub fn as_str_name(&self) -> &'static str {
485 match self {
486 BarrierKind::Initial => "Initial",
487 BarrierKind::Barrier => "Barrier",
488 BarrierKind::Checkpoint(_) => "Checkpoint",
489 }
490 }
491}
492
493pub(super) struct CommandContext {
496 subscription_info: InflightSubscriptionInfo,
497
498 pub(super) barrier_info: BarrierInfo,
499
500 pub(super) table_ids_to_commit: HashSet<TableId>,
501
502 pub(super) command: Option<Command>,
503
504 _span: tracing::Span,
510}
511
512impl std::fmt::Debug for CommandContext {
513 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
514 f.debug_struct("CommandContext")
515 .field("barrier_info", &self.barrier_info)
516 .field("command", &self.command)
517 .finish()
518 }
519}
520
521impl CommandContext {
522 pub(super) fn new(
523 barrier_info: BarrierInfo,
524 subscription_info: InflightSubscriptionInfo,
525 table_ids_to_commit: HashSet<TableId>,
526 command: Option<Command>,
527 span: tracing::Span,
528 ) -> Self {
529 Self {
530 subscription_info,
531 barrier_info,
532 table_ids_to_commit,
533 command,
534 _span: span,
535 }
536 }
537
538 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
539 let Some(truncate_timestamptz) = Timestamptz::from_secs(
540 self.barrier_info
541 .prev_epoch
542 .value()
543 .as_timestamptz()
544 .timestamp()
545 - retention_second as i64,
546 ) else {
547 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
548 return self.barrier_info.prev_epoch.value();
549 };
550 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
551 }
552
553 pub(super) fn collect_commit_epoch_info(
554 &self,
555 info: &mut CommitEpochInfo,
556 resps: Vec<BarrierCompleteResponse>,
557 backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
558 ) {
559 let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) =
560 collect_resp_info(resps);
561
562 let new_table_fragment_infos =
563 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
564 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
565 {
566 let table_fragments = &info.stream_job_fragments;
567 let mut table_ids: HashSet<_> = table_fragments
568 .internal_table_ids()
569 .into_iter()
570 .map(TableId::new)
571 .collect();
572 if let Some(mv_table_id) = table_fragments.mv_table_id() {
573 table_ids.insert(TableId::new(mv_table_id));
574 }
575
576 vec![NewTableFragmentInfo { table_ids }]
577 } else {
578 vec![]
579 };
580
581 let mut mv_log_store_truncate_epoch = HashMap::new();
582 let mut update_truncate_epoch =
584 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
585 .entry(table_id.table_id)
586 {
587 Entry::Occupied(mut entry) => {
588 let prev_truncate_epoch = entry.get_mut();
589 if truncate_epoch < *prev_truncate_epoch {
590 *prev_truncate_epoch = truncate_epoch;
591 }
592 }
593 Entry::Vacant(entry) => {
594 entry.insert(truncate_epoch);
595 }
596 };
597 for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
598 if let Some(truncate_epoch) = subscriptions
599 .values()
600 .max()
601 .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
602 {
603 update_truncate_epoch(*mv_table_id, truncate_epoch);
604 }
605 }
606 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
607 for mv_table_id in upstream_mv_table_ids {
608 update_truncate_epoch(mv_table_id, backfill_epoch);
609 }
610 }
611
612 let table_new_change_log = build_table_change_log_delta(
613 old_value_ssts.into_iter(),
614 synced_ssts.iter().map(|sst| &sst.sst_info),
615 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
616 mv_log_store_truncate_epoch.into_iter(),
617 );
618
619 let epoch = self.barrier_info.prev_epoch();
620 for table_id in &self.table_ids_to_commit {
621 info.tables_to_commit
622 .try_insert(*table_id, epoch)
623 .expect("non duplicate");
624 }
625
626 info.sstables.extend(synced_ssts);
627 info.new_table_watermarks.extend(new_table_watermarks);
628 info.sst_to_context.extend(sst_to_context);
629 info.new_table_fragment_infos
630 .extend(new_table_fragment_infos);
631 info.change_log_delta.extend(table_new_change_log);
632 }
633}
634
635impl Command {
636 pub(super) fn to_mutation(
640 &self,
641 is_currently_paused: bool,
642 edges: &mut Option<FragmentEdgeBuildResult>,
643 control_stream_manager: &ControlStreamManager,
644 ) -> Option<Mutation> {
645 match self {
646 Command::Flush => None,
647
648 Command::Pause => {
649 if !is_currently_paused {
652 Some(Mutation::Pause(PauseMutation {}))
653 } else {
654 None
655 }
656 }
657
658 Command::Resume => {
659 if is_currently_paused {
661 Some(Mutation::Resume(ResumeMutation {}))
662 } else {
663 None
664 }
665 }
666
667 Command::SourceChangeSplit(change) => {
668 let mut diff = HashMap::new();
669
670 for actor_splits in change.values() {
671 diff.extend(actor_splits.clone());
672 }
673
674 Some(Mutation::Splits(SourceChangeSplitMutation {
675 actor_splits: build_actor_connector_splits(&diff),
676 }))
677 }
678
679 Command::Throttle(config) => {
680 let mut actor_to_apply = HashMap::new();
681 for per_fragment in config.values() {
682 actor_to_apply.extend(
683 per_fragment
684 .iter()
685 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
686 );
687 }
688
689 Some(Mutation::Throttle(ThrottleMutation {
690 actor_throttle: actor_to_apply,
691 }))
692 }
693
694 Command::DropStreamingJobs { actors, .. } => Some(Mutation::Stop(StopMutation {
695 actors: actors.clone(),
696 })),
697
698 Command::CreateStreamingJob {
699 info:
700 CreateStreamingJobCommandInfo {
701 stream_job_fragments: table_fragments,
702 init_split_assignment: split_assignment,
703 upstream_fragment_downstreams,
704 ..
705 },
706 job_type,
707 ..
708 } => {
709 let edges = edges.as_mut().expect("should exist");
710 let added_actors = table_fragments.actor_ids();
711 let actor_splits = split_assignment
712 .values()
713 .flat_map(build_actor_connector_splits)
714 .collect();
715 let subscriptions_to_add =
716 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
717 job_type
718 {
719 snapshot_backfill_info
720 .upstream_mv_table_id_to_backfill_epoch
721 .keys()
722 .map(|table_id| SubscriptionUpstreamInfo {
723 subscriber_id: table_fragments.stream_job_id().table_id,
724 upstream_mv_table_id: table_id.table_id,
725 })
726 .collect()
727 } else {
728 Default::default()
729 };
730 let add = Some(Mutation::Add(AddMutation {
731 actor_dispatchers: edges
732 .dispatchers
733 .extract_if(|fragment_id, _| {
734 upstream_fragment_downstreams.contains_key(fragment_id)
735 })
736 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
737 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
738 .collect(),
739 added_actors,
740 actor_splits,
741 pause: is_currently_paused,
743 subscriptions_to_add,
744 }));
745
746 if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
747 old_fragments,
748 init_split_assignment,
749 replace_upstream,
750 upstream_fragment_downstreams,
751 ..
752 }) = job_type
753 {
754 let merge_updates = edges
755 .merge_updates
756 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
757 .collect();
758 let dispatchers = edges
759 .dispatchers
760 .extract_if(|fragment_id, _| {
761 upstream_fragment_downstreams.contains_key(fragment_id)
762 })
763 .collect();
764 let update = Self::generate_update_mutation_for_replace_table(
765 old_fragments,
766 merge_updates,
767 dispatchers,
768 init_split_assignment,
769 );
770
771 Some(Mutation::Combined(CombinedMutation {
772 mutations: vec![
773 BarrierMutation { mutation: add },
774 BarrierMutation { mutation: update },
775 ],
776 }))
777 } else {
778 add
779 }
780 }
781 Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
782 Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
783 info: jobs_to_merge
784 .iter()
785 .flat_map(|(table_id, (backfill_upstream_tables, _))| {
786 backfill_upstream_tables
787 .iter()
788 .map(move |upstream_table_id| SubscriptionUpstreamInfo {
789 subscriber_id: table_id.table_id,
790 upstream_mv_table_id: upstream_table_id.table_id,
791 })
792 })
793 .collect(),
794 }))
795 }
796
797 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
798 old_fragments,
799 replace_upstream,
800 upstream_fragment_downstreams,
801 init_split_assignment,
802 ..
803 }) => {
804 let edges = edges.as_mut().expect("should exist");
805 let merge_updates = edges
806 .merge_updates
807 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
808 .collect();
809 let dispatchers = edges
810 .dispatchers
811 .extract_if(|fragment_id, _| {
812 upstream_fragment_downstreams.contains_key(fragment_id)
813 })
814 .collect();
815 Self::generate_update_mutation_for_replace_table(
816 old_fragments,
817 merge_updates,
818 dispatchers,
819 init_split_assignment,
820 )
821 }
822
823 Command::RescheduleFragment {
824 reschedules,
825 fragment_actors,
826 ..
827 } => {
828 let mut dispatcher_update = HashMap::new();
829 for reschedule in reschedules.values() {
830 for &(upstream_fragment_id, dispatcher_id) in
831 &reschedule.upstream_fragment_dispatcher_ids
832 {
833 let upstream_actor_ids = fragment_actors
835 .get(&upstream_fragment_id)
836 .expect("should contain");
837
838 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
839
840 for &actor_id in upstream_actor_ids {
842 let added_downstream_actor_id = if upstream_reschedule
843 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
844 .unwrap_or(true)
845 {
846 reschedule
847 .added_actors
848 .values()
849 .flatten()
850 .cloned()
851 .collect()
852 } else {
853 Default::default()
854 };
855 dispatcher_update
857 .try_insert(
858 (actor_id, dispatcher_id),
859 DispatcherUpdate {
860 actor_id,
861 dispatcher_id,
862 hash_mapping: reschedule
863 .upstream_dispatcher_mapping
864 .as_ref()
865 .map(|m| m.to_protobuf()),
866 added_downstream_actor_id,
867 removed_downstream_actor_id: reschedule
868 .removed_actors
869 .iter()
870 .cloned()
871 .collect(),
872 },
873 )
874 .unwrap();
875 }
876 }
877 }
878 let dispatcher_update = dispatcher_update.into_values().collect();
879
880 let mut merge_update = HashMap::new();
881 for (&fragment_id, reschedule) in reschedules {
882 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
883 let downstream_actor_ids = fragment_actors
885 .get(&downstream_fragment_id)
886 .expect("should contain");
887
888 let downstream_removed_actors: HashSet<_> = reschedules
892 .get(&downstream_fragment_id)
893 .map(|downstream_reschedule| {
894 downstream_reschedule
895 .removed_actors
896 .iter()
897 .copied()
898 .collect()
899 })
900 .unwrap_or_default();
901
902 for &actor_id in downstream_actor_ids {
904 if downstream_removed_actors.contains(&actor_id) {
905 continue;
906 }
907
908 merge_update
910 .try_insert(
911 (actor_id, fragment_id),
912 MergeUpdate {
913 actor_id,
914 upstream_fragment_id: fragment_id,
915 new_upstream_fragment_id: None,
916 added_upstream_actors: reschedule
917 .added_actors
918 .iter()
919 .flat_map(|(worker_id, actors)| {
920 let host =
921 control_stream_manager.host_addr(*worker_id);
922 actors.iter().map(move |actor_id| ActorInfo {
923 actor_id: *actor_id,
924 host: Some(host.clone()),
925 })
926 })
927 .collect(),
928 removed_upstream_actor_id: reschedule
929 .removed_actors
930 .iter()
931 .cloned()
932 .collect(),
933 },
934 )
935 .unwrap();
936 }
937 }
938 }
939 let merge_update = merge_update.into_values().collect();
940
941 let mut actor_vnode_bitmap_update = HashMap::new();
942 for reschedule in reschedules.values() {
943 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
945 let bitmap = bitmap.to_protobuf();
946 actor_vnode_bitmap_update
947 .try_insert(actor_id, bitmap)
948 .unwrap();
949 }
950 }
951 let dropped_actors = reschedules
952 .values()
953 .flat_map(|r| r.removed_actors.iter().copied())
954 .collect();
955 let mut actor_splits = HashMap::new();
956
957 for reschedule in reschedules.values() {
958 for (actor_id, splits) in &reschedule.actor_splits {
959 actor_splits.insert(
960 *actor_id as ActorId,
961 ConnectorSplits {
962 splits: splits.iter().map(ConnectorSplit::from).collect(),
963 },
964 );
965 }
966 }
967
968 let actor_new_dispatchers = HashMap::new();
970
971 let mutation = Mutation::Update(UpdateMutation {
972 dispatcher_update,
973 merge_update,
974 actor_vnode_bitmap_update,
975 dropped_actors,
976 actor_splits,
977 actor_new_dispatchers,
978 });
979 tracing::debug!("update mutation: {mutation:?}");
980 Some(mutation)
981 }
982
983 Command::CreateSubscription {
984 upstream_mv_table_id,
985 subscription_id,
986 ..
987 } => Some(Mutation::Add(AddMutation {
988 actor_dispatchers: Default::default(),
989 added_actors: vec![],
990 actor_splits: Default::default(),
991 pause: false,
992 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
993 upstream_mv_table_id: upstream_mv_table_id.table_id,
994 subscriber_id: *subscription_id,
995 }],
996 })),
997 Command::DropSubscription {
998 upstream_mv_table_id,
999 subscription_id,
1000 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1001 info: vec![SubscriptionUpstreamInfo {
1002 subscriber_id: *subscription_id,
1003 upstream_mv_table_id: upstream_mv_table_id.table_id,
1004 }],
1005 })),
1006 Command::ConnectorPropsChange(config) => {
1007 let mut connector_props_infos = HashMap::default();
1008 for (k, v) in config {
1009 connector_props_infos.insert(
1010 *k,
1011 ConnectorPropsInfo {
1012 connector_props_info: v.clone(),
1013 },
1014 );
1015 }
1016 Some(Mutation::ConnectorPropsChange(
1017 ConnectorPropsChangeMutation {
1018 connector_props_infos,
1019 },
1020 ))
1021 }
1022 }
1023 }
1024
1025 pub(super) fn actors_to_create(
1026 &self,
1027 graph_info: &InflightDatabaseInfo,
1028 edges: &mut Option<FragmentEdgeBuildResult>,
1029 control_stream_manager: &ControlStreamManager,
1030 ) -> Option<StreamJobActorsToCreate> {
1031 match self {
1032 Command::CreateStreamingJob { info, job_type, .. } => {
1033 let sink_into_table_replace_plan = match job_type {
1034 CreateStreamingJobType::Normal => None,
1035 CreateStreamingJobType::SinkIntoTable(replace_table) => Some(replace_table),
1036 CreateStreamingJobType::SnapshotBackfill(_) => {
1037 return None;
1039 }
1040 };
1041 let get_actors_to_create = || {
1042 sink_into_table_replace_plan
1043 .map(|plan| plan.new_fragments.actors_to_create())
1044 .into_iter()
1045 .flatten()
1046 .chain(info.stream_job_fragments.actors_to_create())
1047 };
1048 let edges = edges.as_mut().expect("should exist");
1049 Some(edges.collect_actors_to_create(get_actors_to_create()))
1050 }
1051 Command::RescheduleFragment {
1052 reschedules,
1053 fragment_actors,
1054 ..
1055 } => {
1056 let mut actor_upstreams = Self::collect_actor_upstreams(
1057 reschedules.iter().map(|(fragment_id, reschedule)| {
1058 (
1059 *fragment_id,
1060 reschedule.newly_created_actors.values().map(
1061 |((actor, dispatchers), _)| {
1062 (actor.actor_id, dispatchers.as_slice())
1063 },
1064 ),
1065 )
1066 }),
1067 Some((reschedules, fragment_actors)),
1068 graph_info,
1069 control_stream_manager,
1070 );
1071 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1072 for (fragment_id, (actor, dispatchers), worker_id) in
1073 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1074 reschedule
1075 .newly_created_actors
1076 .values()
1077 .map(|(actors, status)| (*fragment_id, actors, status))
1078 })
1079 {
1080 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1081 map.entry(*worker_id)
1082 .or_default()
1083 .entry(fragment_id)
1084 .or_insert_with(|| {
1085 let node = graph_info.fragment(fragment_id).nodes.clone();
1086 (node, vec![])
1087 })
1088 .1
1089 .push((actor.clone(), upstreams, dispatchers.clone()));
1090 }
1091 Some(map)
1092 }
1093 Command::ReplaceStreamJob(replace_table) => {
1094 let edges = edges.as_mut().expect("should exist");
1095 Some(edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create()))
1096 }
1097 _ => None,
1098 }
1099 }
1100
1101 fn generate_update_mutation_for_replace_table(
1102 old_fragments: &StreamJobFragments,
1103 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1104 dispatchers: FragmentActorDispatchers,
1105 init_split_assignment: &SplitAssignment,
1106 ) -> Option<Mutation> {
1107 let dropped_actors = old_fragments.actor_ids();
1108
1109 let actor_new_dispatchers = dispatchers
1110 .into_values()
1111 .flatten()
1112 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1113 .collect();
1114
1115 let actor_splits = init_split_assignment
1116 .values()
1117 .flat_map(build_actor_connector_splits)
1118 .collect();
1119
1120 Some(Mutation::Update(UpdateMutation {
1121 actor_new_dispatchers,
1122 merge_update: merge_updates.into_values().flatten().collect(),
1123 dropped_actors,
1124 actor_splits,
1125 ..Default::default()
1126 }))
1127 }
1128
1129 pub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1131 match self {
1132 Command::DropStreamingJobs {
1133 table_fragments_ids,
1134 ..
1135 } => Some(table_fragments_ids.iter().cloned()),
1136 _ => None,
1137 }
1138 .into_iter()
1139 .flatten()
1140 }
1141}
1142
1143impl Command {
1144 #[expect(clippy::type_complexity)]
1145 pub(super) fn collect_actor_upstreams(
1146 actor_dispatchers: impl Iterator<
1147 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1148 >,
1149 reschedule_dispatcher_update: Option<(
1150 &HashMap<FragmentId, Reschedule>,
1151 &HashMap<FragmentId, HashSet<ActorId>>,
1152 )>,
1153 graph_info: &InflightDatabaseInfo,
1154 control_stream_manager: &ControlStreamManager,
1155 ) -> HashMap<ActorId, ActorUpstreams> {
1156 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1157 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1158 let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1159 for (upstream_actor_id, dispatchers) in upstream_actors {
1160 let upstream_actor_location =
1161 upstream_fragment.actors[&upstream_actor_id].worker_id;
1162 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1163 for downstream_actor_id in dispatchers
1164 .iter()
1165 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1166 {
1167 actor_upstreams
1168 .entry(*downstream_actor_id)
1169 .or_default()
1170 .entry(upstream_fragment_id)
1171 .or_default()
1172 .insert(
1173 upstream_actor_id,
1174 ActorInfo {
1175 actor_id: upstream_actor_id,
1176 host: Some(upstream_actor_host.clone()),
1177 },
1178 );
1179 }
1180 }
1181 }
1182 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1183 for reschedule in reschedules.values() {
1184 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1185 let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1186 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1187 for upstream_actor_id in fragment_actors
1188 .get(upstream_fragment_id)
1189 .expect("should exist")
1190 {
1191 let upstream_actor_location =
1192 upstream_fragment.actors[upstream_actor_id].worker_id;
1193 let upstream_actor_host =
1194 control_stream_manager.host_addr(upstream_actor_location);
1195 if let Some(upstream_reschedule) = upstream_reschedule
1196 && upstream_reschedule
1197 .removed_actors
1198 .contains(upstream_actor_id)
1199 {
1200 continue;
1201 }
1202 for (_, downstream_actor_id) in
1203 reschedule
1204 .added_actors
1205 .iter()
1206 .flat_map(|(worker_id, actors)| {
1207 actors.iter().map(|actor| (*worker_id, *actor))
1208 })
1209 {
1210 actor_upstreams
1211 .entry(downstream_actor_id)
1212 .or_default()
1213 .entry(*upstream_fragment_id)
1214 .or_default()
1215 .insert(
1216 *upstream_actor_id,
1217 ActorInfo {
1218 actor_id: *upstream_actor_id,
1219 host: Some(upstream_actor_host.clone()),
1220 },
1221 );
1222 }
1223 }
1224 }
1225 }
1226 }
1227 actor_upstreams
1228 }
1229}