1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18use std::iter;
19
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::TableId;
23use risingwave_common::hash::ActorMapping;
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::SplitImpl;
29use risingwave_connector::source::cdc::{
30 CdcTableSnapshotSplitAssignment, CdcTableSnapshotSplitAssignmentWithGeneration,
31 build_pb_actor_cdc_table_snapshot_splits,
32 build_pb_actor_cdc_table_snapshot_splits_with_generation,
33};
34use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
35use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::catalog::CreateType;
38use risingwave_pb::catalog::table::PbTableType;
39use risingwave_pb::common::PbActorInfo;
40use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
41use risingwave_pb::source::{
42 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
43};
44use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
45use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
46use risingwave_pb::stream_plan::barrier_mutation::Mutation;
47use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
48use risingwave_pb::stream_plan::stream_node::NodeBody;
49use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
50use risingwave_pb::stream_plan::update_mutation::*;
51use risingwave_pb::stream_plan::{
52 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
53 LoadFinishMutation, PauseMutation, PbSinkAddColumns, PbUpstreamSinkInfo, ResumeMutation,
54 SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
55 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
56};
57use risingwave_pb::stream_service::BarrierCompleteResponse;
58use tracing::warn;
59
60use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
61use crate::barrier::InflightSubscriptionInfo;
62use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
63use crate::barrier::edge_builder::FragmentEdgeBuildResult;
64use crate::barrier::info::BarrierInfo;
65use crate::barrier::rpc::ControlStreamManager;
66use crate::barrier::utils::collect_resp_info;
67use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
68use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
69use crate::manager::{StreamingJob, StreamingJobType};
70use crate::model::{
71 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
72 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
73 StreamJobFragments, StreamJobFragmentsToCreate,
74};
75use crate::stream::{
76 AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder,
77 JobReschedulePostUpdates, SplitAssignment, ThrottleConfig, UpstreamSinkInfo,
78 build_actor_connector_splits,
79};
80
81#[derive(Debug, Clone)]
84pub struct Reschedule {
85 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
87
88 pub removed_actors: HashSet<ActorId>,
90
91 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
93
94 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
96 pub upstream_dispatcher_mapping: Option<ActorMapping>,
101
102 pub downstream_fragment_ids: Vec<FragmentId>,
104
105 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
109
110 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
111
112 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
113
114 pub cdc_table_id: Option<u32>,
115}
116
117#[derive(Debug, Clone)]
124pub struct ReplaceStreamJobPlan {
125 pub old_fragments: StreamJobFragments,
126 pub new_fragments: StreamJobFragmentsToCreate,
127 pub replace_upstream: FragmentReplaceUpstream,
130 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
131 pub init_split_assignment: SplitAssignment,
137 pub streaming_job: StreamingJob,
139 pub tmp_id: u32,
141 pub to_drop_state_table_ids: Vec<TableId>,
143 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
144 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
145}
146
147impl ReplaceStreamJobPlan {
148 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
149 let mut fragment_changes = HashMap::new();
150 for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
151 let fragment_change = CommandFragmentChanges::NewFragment {
152 job_id: self.streaming_job.id().into(),
153 info: new_fragment,
154 is_existing: false,
155 };
156 fragment_changes
157 .try_insert(fragment_id, fragment_change)
158 .expect("non-duplicate");
159 }
160 for fragment in self.old_fragments.fragments.values() {
161 fragment_changes
162 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
163 .expect("non-duplicate");
164 }
165 for (fragment_id, replace_map) in &self.replace_upstream {
166 fragment_changes
167 .try_insert(
168 *fragment_id,
169 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
170 )
171 .expect("non-duplicate");
172 }
173 if let Some(sinks) = &self.auto_refresh_schema_sinks {
174 for sink in sinks {
175 let fragment_change = CommandFragmentChanges::NewFragment {
176 job_id: TableId::new(sink.original_sink.id as _),
177 info: sink.new_fragment_info(),
178 is_existing: false,
179 };
180 fragment_changes
181 .try_insert(sink.new_fragment.fragment_id, fragment_change)
182 .expect("non-duplicate");
183 fragment_changes
184 .try_insert(
185 sink.original_fragment.fragment_id,
186 CommandFragmentChanges::RemoveFragment,
187 )
188 .expect("non-duplicate");
189 }
190 }
191 fragment_changes
192 }
193
194 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
196 let mut fragment_replacements = HashMap::new();
197 for (upstream_fragment_id, new_upstream_fragment_id) in
198 self.replace_upstream.values().flatten()
199 {
200 {
201 let r =
202 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
203 if let Some(r) = r {
204 assert_eq!(
205 *new_upstream_fragment_id, r,
206 "one fragment is replaced by multiple fragments"
207 );
208 }
209 }
210 }
211 fragment_replacements
212 }
213}
214
215#[derive(educe::Educe, Clone)]
216#[educe(Debug)]
217pub struct CreateStreamingJobCommandInfo {
218 #[educe(Debug(ignore))]
219 pub stream_job_fragments: StreamJobFragmentsToCreate,
220 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
221 pub init_split_assignment: SplitAssignment,
222 pub definition: String,
223 pub job_type: StreamingJobType,
224 pub create_type: CreateType,
225 pub streaming_job: StreamingJob,
226 pub fragment_backfill_ordering: FragmentBackfillOrder,
227 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
228}
229
230impl StreamJobFragments {
231 pub(super) fn new_fragment_info(
232 &self,
233 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
234 self.fragments.values().map(|fragment| {
235 (
236 fragment.fragment_id,
237 InflightFragmentInfo {
238 fragment_id: fragment.fragment_id,
239 distribution_type: fragment.distribution_type.into(),
240 nodes: fragment.nodes.clone(),
241 actors: fragment
242 .actors
243 .iter()
244 .map(|actor| {
245 (
246 actor.actor_id,
247 InflightActorInfo {
248 worker_id: self
249 .actor_status
250 .get(&actor.actor_id)
251 .expect("should exist")
252 .worker_id()
253 as WorkerId,
254 vnode_bitmap: actor.vnode_bitmap.clone(),
255 },
256 )
257 })
258 .collect(),
259 state_table_ids: fragment
260 .state_table_ids
261 .iter()
262 .map(|table_id| TableId::new(*table_id))
263 .collect(),
264 },
265 )
266 })
267 }
268}
269
270#[derive(Debug, Clone)]
271pub struct SnapshotBackfillInfo {
272 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
276}
277
278#[derive(Debug, Clone)]
279pub enum CreateStreamingJobType {
280 Normal,
281 SinkIntoTable(UpstreamSinkInfo),
282 SnapshotBackfill(SnapshotBackfillInfo),
283}
284
285#[derive(Debug)]
290pub enum Command {
291 Flush,
294
295 Pause,
298
299 Resume,
303
304 DropStreamingJobs {
312 streaming_job_ids: HashSet<TableId>,
313 actors: Vec<ActorId>,
314 unregistered_state_table_ids: HashSet<TableId>,
315 unregistered_fragment_ids: HashSet<FragmentId>,
316 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
318 },
319
320 CreateStreamingJob {
330 info: CreateStreamingJobCommandInfo,
331 job_type: CreateStreamingJobType,
332 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
333 },
334 MergeSnapshotBackfillStreamingJobs(
335 HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
336 ),
337
338 RescheduleFragment {
344 reschedules: HashMap<FragmentId, Reschedule>,
345 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
347 post_updates: JobReschedulePostUpdates,
349 },
350
351 ReplaceStreamJob(ReplaceStreamJobPlan),
358
359 SourceChangeSplit(SplitAssignment),
362
363 Throttle(ThrottleConfig),
366
367 CreateSubscription {
370 subscription_id: u32,
371 upstream_mv_table_id: TableId,
372 retention_second: u64,
373 },
374
375 DropSubscription {
379 subscription_id: u32,
380 upstream_mv_table_id: TableId,
381 },
382
383 ConnectorPropsChange(ConnectorPropsChange),
384
385 StartFragmentBackfill {
387 fragment_ids: Vec<FragmentId>,
388 },
389
390 Refresh {
393 table_id: TableId,
394 associated_source_id: TableId,
395 },
396 LoadFinish {
397 table_id: TableId,
398 associated_source_id: TableId,
399 },
400}
401
402impl std::fmt::Display for Command {
404 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
405 match self {
406 Command::Flush => write!(f, "Flush"),
407 Command::Pause => write!(f, "Pause"),
408 Command::Resume => write!(f, "Resume"),
409 Command::DropStreamingJobs {
410 streaming_job_ids, ..
411 } => {
412 write!(
413 f,
414 "DropStreamingJobs: {}",
415 streaming_job_ids.iter().sorted().join(", ")
416 )
417 }
418 Command::CreateStreamingJob { info, .. } => {
419 write!(f, "CreateStreamingJob: {}", info.streaming_job)
420 }
421 Command::MergeSnapshotBackfillStreamingJobs(_) => {
422 write!(f, "MergeSnapshotBackfillStreamingJobs")
423 }
424 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
425 Command::ReplaceStreamJob(plan) => {
426 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
427 }
428 Command::SourceChangeSplit(_) => write!(f, "SourceChangeSplit"),
429 Command::Throttle(_) => write!(f, "Throttle"),
430 Command::CreateSubscription {
431 subscription_id, ..
432 } => write!(f, "CreateSubscription: {subscription_id}"),
433 Command::DropSubscription {
434 subscription_id, ..
435 } => write!(f, "DropSubscription: {subscription_id}"),
436 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
437 Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
438 Command::Refresh {
439 table_id,
440 associated_source_id,
441 } => write!(
442 f,
443 "Refresh: {} (source: {})",
444 table_id, associated_source_id
445 ),
446 Command::LoadFinish {
447 table_id,
448 associated_source_id,
449 } => write!(
450 f,
451 "LoadFinish: {} (source: {})",
452 table_id, associated_source_id
453 ),
454 }
455 }
456}
457
458impl Command {
459 pub fn pause() -> Self {
460 Self::Pause
461 }
462
463 pub fn resume() -> Self {
464 Self::Resume
465 }
466
467 pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
468 match self {
469 Command::Flush => None,
470 Command::Pause => None,
471 Command::Resume => None,
472 Command::DropStreamingJobs {
473 unregistered_fragment_ids,
474 dropped_sink_fragment_by_targets,
475 ..
476 } => {
477 let changes = unregistered_fragment_ids
478 .iter()
479 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
480 .chain(dropped_sink_fragment_by_targets.iter().map(
481 |(target_fragment, sink_fragments)| {
482 (
483 *target_fragment,
484 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
485 )
486 },
487 ))
488 .collect();
489
490 Some(changes)
491 }
492 Command::CreateStreamingJob { info, job_type, .. } => {
493 assert!(
494 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
495 "should handle fragment changes separately for snapshot backfill"
496 );
497 let mut changes: HashMap<_, _> = info
498 .stream_job_fragments
499 .new_fragment_info()
500 .map(|(fragment_id, fragment_info)| {
501 (
502 fragment_id,
503 CommandFragmentChanges::NewFragment {
504 job_id: info.streaming_job.id().into(),
505 info: fragment_info,
506 is_existing: false,
507 },
508 )
509 })
510 .collect();
511
512 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
513 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
514 changes.insert(
515 downstream_fragment_id,
516 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
517 upstream_fragment_id: ctx.sink_fragment_id,
518 sink_output_schema: ctx.sink_output_fields.clone(),
519 project_exprs: ctx.project_exprs.clone(),
520 }),
521 );
522 }
523
524 Some(changes)
525 }
526 Command::RescheduleFragment { reschedules, .. } => Some(
527 reschedules
528 .iter()
529 .map(|(fragment_id, reschedule)| {
530 (
531 *fragment_id,
532 CommandFragmentChanges::Reschedule {
533 new_actors: reschedule
534 .added_actors
535 .iter()
536 .flat_map(|(node_id, actors)| {
537 actors.iter().map(|actor_id| {
538 (
539 *actor_id,
540 InflightActorInfo {
541 worker_id: *node_id,
542 vnode_bitmap: reschedule
543 .newly_created_actors
544 .get(actor_id)
545 .expect("should exist")
546 .0
547 .0
548 .vnode_bitmap
549 .clone(),
550 },
551 )
552 })
553 })
554 .collect(),
555 actor_update_vnode_bitmap: reschedule
556 .vnode_bitmap_updates
557 .iter()
558 .filter(|(actor_id, _)| {
559 !reschedule.newly_created_actors.contains_key(actor_id)
561 })
562 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
563 .collect(),
564 to_remove: reschedule.removed_actors.iter().cloned().collect(),
565 },
566 )
567 })
568 .collect(),
569 ),
570 Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
571 Command::MergeSnapshotBackfillStreamingJobs(_) => None,
572 Command::SourceChangeSplit(_) => None,
573 Command::Throttle(_) => None,
574 Command::CreateSubscription { .. } => None,
575 Command::DropSubscription { .. } => None,
576 Command::ConnectorPropsChange(_) => None,
577 Command::StartFragmentBackfill { .. } => None,
578 Command::Refresh { .. } => None, Command::LoadFinish { .. } => None, }
581 }
582
583 pub fn need_checkpoint(&self) -> bool {
584 !matches!(self, Command::Resume)
586 }
587}
588
589#[derive(Debug, Clone)]
590pub enum BarrierKind {
591 Initial,
592 Barrier,
593 Checkpoint(Vec<u64>),
595}
596
597impl BarrierKind {
598 pub fn to_protobuf(&self) -> PbBarrierKind {
599 match self {
600 BarrierKind::Initial => PbBarrierKind::Initial,
601 BarrierKind::Barrier => PbBarrierKind::Barrier,
602 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
603 }
604 }
605
606 pub fn is_checkpoint(&self) -> bool {
607 matches!(self, BarrierKind::Checkpoint(_))
608 }
609
610 pub fn is_initial(&self) -> bool {
611 matches!(self, BarrierKind::Initial)
612 }
613
614 pub fn as_str_name(&self) -> &'static str {
615 match self {
616 BarrierKind::Initial => "Initial",
617 BarrierKind::Barrier => "Barrier",
618 BarrierKind::Checkpoint(_) => "Checkpoint",
619 }
620 }
621}
622
623pub(super) struct CommandContext {
626 subscription_info: InflightSubscriptionInfo,
627
628 pub(super) barrier_info: BarrierInfo,
629
630 pub(super) table_ids_to_commit: HashSet<TableId>,
631
632 pub(super) command: Option<Command>,
633
634 _span: tracing::Span,
640}
641
642impl std::fmt::Debug for CommandContext {
643 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
644 f.debug_struct("CommandContext")
645 .field("barrier_info", &self.barrier_info)
646 .field("command", &self.command)
647 .finish()
648 }
649}
650
651impl std::fmt::Display for CommandContext {
652 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
653 write!(
654 f,
655 "prev_epoch={}, curr_epoch={}, kind={}",
656 self.barrier_info.prev_epoch.value().0,
657 self.barrier_info.curr_epoch.value().0,
658 self.barrier_info.kind.as_str_name()
659 )?;
660 if let Some(command) = &self.command {
661 write!(f, ", command={}", command)?;
662 }
663 Ok(())
664 }
665}
666
667impl CommandContext {
668 pub(super) fn new(
669 barrier_info: BarrierInfo,
670 subscription_info: InflightSubscriptionInfo,
671 table_ids_to_commit: HashSet<TableId>,
672 command: Option<Command>,
673 span: tracing::Span,
674 ) -> Self {
675 Self {
676 subscription_info,
677 barrier_info,
678 table_ids_to_commit,
679 command,
680 _span: span,
681 }
682 }
683
684 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
685 let Some(truncate_timestamptz) = Timestamptz::from_secs(
686 self.barrier_info
687 .prev_epoch
688 .value()
689 .as_timestamptz()
690 .timestamp()
691 - retention_second as i64,
692 ) else {
693 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
694 return self.barrier_info.prev_epoch.value();
695 };
696 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
697 }
698
699 pub(super) fn collect_commit_epoch_info(
700 &self,
701 info: &mut CommitEpochInfo,
702 resps: Vec<BarrierCompleteResponse>,
703 backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
704 ) {
705 let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts, vector_index_adds) =
706 collect_resp_info(resps);
707
708 let new_table_fragment_infos =
709 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
710 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
711 {
712 let table_fragments = &info.stream_job_fragments;
713 let mut table_ids: HashSet<_> = table_fragments
714 .internal_table_ids()
715 .into_iter()
716 .map(TableId::new)
717 .collect();
718 if let Some(mv_table_id) = table_fragments.mv_table_id() {
719 table_ids.insert(TableId::new(mv_table_id));
720 }
721
722 vec![NewTableFragmentInfo { table_ids }]
723 } else {
724 vec![]
725 };
726
727 let mut mv_log_store_truncate_epoch = HashMap::new();
728 let mut update_truncate_epoch =
730 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
731 .entry(table_id.table_id)
732 {
733 Entry::Occupied(mut entry) => {
734 let prev_truncate_epoch = entry.get_mut();
735 if truncate_epoch < *prev_truncate_epoch {
736 *prev_truncate_epoch = truncate_epoch;
737 }
738 }
739 Entry::Vacant(entry) => {
740 entry.insert(truncate_epoch);
741 }
742 };
743 for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
744 if let Some(truncate_epoch) = subscriptions
745 .values()
746 .max()
747 .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
748 {
749 update_truncate_epoch(*mv_table_id, truncate_epoch);
750 }
751 }
752 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
753 for mv_table_id in upstream_mv_table_ids {
754 update_truncate_epoch(mv_table_id, backfill_epoch);
755 }
756 }
757
758 let table_new_change_log = build_table_change_log_delta(
759 old_value_ssts.into_iter(),
760 synced_ssts.iter().map(|sst| &sst.sst_info),
761 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
762 mv_log_store_truncate_epoch.into_iter(),
763 );
764
765 let epoch = self.barrier_info.prev_epoch();
766 for table_id in &self.table_ids_to_commit {
767 info.tables_to_commit
768 .try_insert(*table_id, epoch)
769 .expect("non duplicate");
770 }
771
772 info.sstables.extend(synced_ssts);
773 info.new_table_watermarks.extend(new_table_watermarks);
774 info.sst_to_context.extend(sst_to_context);
775 info.new_table_fragment_infos
776 .extend(new_table_fragment_infos);
777 info.change_log_delta.extend(table_new_change_log);
778 for (table_id, vector_index_adds) in vector_index_adds {
779 info.vector_index_delta
780 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
781 .expect("non-duplicate");
782 }
783 if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
784 for fragment in job_info.stream_job_fragments.fragments.values() {
785 visit_stream_node_cont(&fragment.nodes, |node| {
786 match node.node_body.as_ref().unwrap() {
787 NodeBody::VectorIndexWrite(vector_index_write) => {
788 let index_table = vector_index_write.table.as_ref().unwrap();
789 assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
790 info.vector_index_delta
791 .try_insert(
792 index_table.id.into(),
793 VectorIndexDelta::Init(PbVectorIndexInit {
794 info: Some(index_table.vector_index_info.unwrap()),
795 }),
796 )
797 .expect("non-duplicate");
798 false
799 }
800 _ => true,
801 }
802 })
803 }
804 }
805 }
806}
807
808impl Command {
809 pub(super) fn to_mutation(
813 &self,
814 is_currently_paused: bool,
815 edges: &mut Option<FragmentEdgeBuildResult>,
816 control_stream_manager: &ControlStreamManager,
817 ) -> Option<Mutation> {
818 match self {
819 Command::Flush => None,
820
821 Command::Pause => {
822 if !is_currently_paused {
825 Some(Mutation::Pause(PauseMutation {}))
826 } else {
827 None
828 }
829 }
830
831 Command::Resume => {
832 if is_currently_paused {
834 Some(Mutation::Resume(ResumeMutation {}))
835 } else {
836 None
837 }
838 }
839
840 Command::SourceChangeSplit(change) => {
841 let mut diff = HashMap::new();
842
843 for actor_splits in change.values() {
844 diff.extend(actor_splits.clone());
845 }
846
847 Some(Mutation::Splits(SourceChangeSplitMutation {
848 actor_splits: build_actor_connector_splits(&diff),
849 }))
850 }
851
852 Command::Throttle(config) => {
853 let mut actor_to_apply = HashMap::new();
854 for per_fragment in config.values() {
855 actor_to_apply.extend(
856 per_fragment
857 .iter()
858 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
859 );
860 }
861
862 Some(Mutation::Throttle(ThrottleMutation {
863 actor_throttle: actor_to_apply,
864 }))
865 }
866
867 Command::DropStreamingJobs {
868 actors,
869 dropped_sink_fragment_by_targets,
870 ..
871 } => Some(Mutation::Stop(StopMutation {
872 actors: actors.clone(),
873 dropped_sink_fragments: dropped_sink_fragment_by_targets
874 .values()
875 .flatten()
876 .cloned()
877 .collect(),
878 })),
879
880 Command::CreateStreamingJob {
881 info:
882 CreateStreamingJobCommandInfo {
883 stream_job_fragments: table_fragments,
884 init_split_assignment: split_assignment,
885 upstream_fragment_downstreams,
886 fragment_backfill_ordering,
887 cdc_table_snapshot_split_assignment,
888 ..
889 },
890 job_type,
891 ..
892 } => {
893 let edges = edges.as_mut().expect("should exist");
894 let added_actors = table_fragments.actor_ids();
895 let actor_splits = split_assignment
896 .values()
897 .flat_map(build_actor_connector_splits)
898 .collect();
899 let subscriptions_to_add =
900 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
901 job_type
902 {
903 snapshot_backfill_info
904 .upstream_mv_table_id_to_backfill_epoch
905 .keys()
906 .map(|table_id| SubscriptionUpstreamInfo {
907 subscriber_id: table_fragments.stream_job_id().table_id,
908 upstream_mv_table_id: table_id.table_id,
909 })
910 .collect()
911 } else {
912 Default::default()
913 };
914 let backfill_nodes_to_pause: Vec<_> =
915 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
916 .into_iter()
917 .collect();
918
919 let new_upstream_sinks =
920 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
921 sink_fragment_id,
922 sink_output_fields,
923 project_exprs,
924 new_sink_downstream,
925 ..
926 }) = job_type
927 {
928 let new_sink_actors = table_fragments
929 .actors_to_create()
930 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
931 .exactly_one()
932 .map(|(_, _, actors)| {
933 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
934 actor_id: actor.actor_id,
935 host: Some(control_stream_manager.host_addr(worker_id)),
936 })
937 })
938 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
939 let new_upstream_sink = PbNewUpstreamSink {
940 info: Some(PbUpstreamSinkInfo {
941 upstream_fragment_id: *sink_fragment_id,
942 sink_output_schema: sink_output_fields.clone(),
943 project_exprs: project_exprs.clone(),
944 }),
945 upstream_actors: new_sink_actors.collect(),
946 };
947 HashMap::from([(
948 new_sink_downstream.downstream_fragment_id,
949 new_upstream_sink.clone(),
950 )])
951 } else {
952 HashMap::new()
953 };
954
955 let add_mutation = AddMutation {
956 actor_dispatchers: edges
957 .dispatchers
958 .extract_if(|fragment_id, _| {
959 upstream_fragment_downstreams.contains_key(fragment_id)
960 })
961 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
962 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
963 .collect(),
964 added_actors,
965 actor_splits,
966 pause: is_currently_paused,
968 subscriptions_to_add,
969 backfill_nodes_to_pause,
970 actor_cdc_table_snapshot_splits:
971 build_pb_actor_cdc_table_snapshot_splits_with_generation(
972 cdc_table_snapshot_split_assignment.clone(),
973 )
974 .into(),
975 new_upstream_sinks,
976 };
977
978 Some(Mutation::Add(add_mutation))
979 }
980 Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
981 Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
982 info: jobs_to_merge
983 .iter()
984 .flat_map(|(table_id, (backfill_upstream_tables, _))| {
985 backfill_upstream_tables
986 .iter()
987 .map(move |upstream_table_id| SubscriptionUpstreamInfo {
988 subscriber_id: table_id.table_id,
989 upstream_mv_table_id: upstream_table_id.table_id,
990 })
991 })
992 .collect(),
993 }))
994 }
995
996 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
997 old_fragments,
998 replace_upstream,
999 upstream_fragment_downstreams,
1000 init_split_assignment,
1001 auto_refresh_schema_sinks,
1002 cdc_table_snapshot_split_assignment,
1003 ..
1004 }) => {
1005 let edges = edges.as_mut().expect("should exist");
1006 let merge_updates = edges
1007 .merge_updates
1008 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1009 .collect();
1010 let dispatchers = edges
1011 .dispatchers
1012 .extract_if(|fragment_id, _| {
1013 upstream_fragment_downstreams.contains_key(fragment_id)
1014 })
1015 .collect();
1016 let cdc_table_snapshot_split_assignment =
1017 if cdc_table_snapshot_split_assignment.is_empty() {
1018 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
1019 } else {
1020 CdcTableSnapshotSplitAssignmentWithGeneration::new(
1021 cdc_table_snapshot_split_assignment.clone(),
1022 control_stream_manager
1023 .env
1024 .cdc_table_backfill_tracker
1025 .next_generation(iter::once(old_fragments.stream_job_id.table_id)),
1026 )
1027 };
1028 Self::generate_update_mutation_for_replace_table(
1029 old_fragments.actor_ids().into_iter().chain(
1030 auto_refresh_schema_sinks
1031 .as_ref()
1032 .into_iter()
1033 .flat_map(|sinks| {
1034 sinks.iter().flat_map(|sink| {
1035 sink.original_fragment
1036 .actors
1037 .iter()
1038 .map(|actor| actor.actor_id)
1039 })
1040 }),
1041 ),
1042 merge_updates,
1043 dispatchers,
1044 init_split_assignment,
1045 cdc_table_snapshot_split_assignment,
1046 auto_refresh_schema_sinks.as_ref(),
1047 )
1048 }
1049
1050 Command::RescheduleFragment {
1051 reschedules,
1052 fragment_actors,
1053 ..
1054 } => {
1055 let mut dispatcher_update = HashMap::new();
1056 for reschedule in reschedules.values() {
1057 for &(upstream_fragment_id, dispatcher_id) in
1058 &reschedule.upstream_fragment_dispatcher_ids
1059 {
1060 let upstream_actor_ids = fragment_actors
1062 .get(&upstream_fragment_id)
1063 .expect("should contain");
1064
1065 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1066
1067 for &actor_id in upstream_actor_ids {
1069 let added_downstream_actor_id = if upstream_reschedule
1070 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1071 .unwrap_or(true)
1072 {
1073 reschedule
1074 .added_actors
1075 .values()
1076 .flatten()
1077 .cloned()
1078 .collect()
1079 } else {
1080 Default::default()
1081 };
1082 dispatcher_update
1084 .try_insert(
1085 (actor_id, dispatcher_id),
1086 DispatcherUpdate {
1087 actor_id,
1088 dispatcher_id,
1089 hash_mapping: reschedule
1090 .upstream_dispatcher_mapping
1091 .as_ref()
1092 .map(|m| m.to_protobuf()),
1093 added_downstream_actor_id,
1094 removed_downstream_actor_id: reschedule
1095 .removed_actors
1096 .iter()
1097 .cloned()
1098 .collect(),
1099 },
1100 )
1101 .unwrap();
1102 }
1103 }
1104 }
1105 let dispatcher_update = dispatcher_update.into_values().collect();
1106
1107 let mut merge_update = HashMap::new();
1108 for (&fragment_id, reschedule) in reschedules {
1109 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1110 let downstream_actor_ids = fragment_actors
1112 .get(&downstream_fragment_id)
1113 .expect("should contain");
1114
1115 let downstream_removed_actors: HashSet<_> = reschedules
1119 .get(&downstream_fragment_id)
1120 .map(|downstream_reschedule| {
1121 downstream_reschedule
1122 .removed_actors
1123 .iter()
1124 .copied()
1125 .collect()
1126 })
1127 .unwrap_or_default();
1128
1129 for &actor_id in downstream_actor_ids {
1131 if downstream_removed_actors.contains(&actor_id) {
1132 continue;
1133 }
1134
1135 merge_update
1137 .try_insert(
1138 (actor_id, fragment_id),
1139 MergeUpdate {
1140 actor_id,
1141 upstream_fragment_id: fragment_id,
1142 new_upstream_fragment_id: None,
1143 added_upstream_actors: reschedule
1144 .added_actors
1145 .iter()
1146 .flat_map(|(worker_id, actors)| {
1147 let host =
1148 control_stream_manager.host_addr(*worker_id);
1149 actors.iter().map(move |actor_id| PbActorInfo {
1150 actor_id: *actor_id,
1151 host: Some(host.clone()),
1152 })
1153 })
1154 .collect(),
1155 removed_upstream_actor_id: reschedule
1156 .removed_actors
1157 .iter()
1158 .cloned()
1159 .collect(),
1160 },
1161 )
1162 .unwrap();
1163 }
1164 }
1165 }
1166 let merge_update = merge_update.into_values().collect();
1167
1168 let mut actor_vnode_bitmap_update = HashMap::new();
1169 for reschedule in reschedules.values() {
1170 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1172 let bitmap = bitmap.to_protobuf();
1173 actor_vnode_bitmap_update
1174 .try_insert(actor_id, bitmap)
1175 .unwrap();
1176 }
1177 }
1178 let dropped_actors = reschedules
1179 .values()
1180 .flat_map(|r| r.removed_actors.iter().copied())
1181 .collect();
1182 let mut actor_splits = HashMap::new();
1183 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1184 let mut cdc_table_ids: HashSet<_> = HashSet::default();
1185 for reschedule in reschedules.values() {
1186 for (actor_id, splits) in &reschedule.actor_splits {
1187 actor_splits.insert(
1188 *actor_id as ActorId,
1189 ConnectorSplits {
1190 splits: splits.iter().map(ConnectorSplit::from).collect(),
1191 },
1192 );
1193 }
1194 actor_cdc_table_snapshot_splits.extend(
1195 build_pb_actor_cdc_table_snapshot_splits(
1196 reschedule.cdc_table_snapshot_split_assignment.clone(),
1197 ),
1198 );
1199 if let Some(cdc_table_id) = reschedule.cdc_table_id {
1200 cdc_table_ids.insert(cdc_table_id);
1201 }
1202 }
1203
1204 let actor_new_dispatchers = HashMap::new();
1206 let actor_cdc_table_snapshot_splits = if actor_cdc_table_snapshot_splits.is_empty()
1207 {
1208 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1209 CdcTableSnapshotSplitAssignmentWithGeneration::empty(),
1210 )
1211 .into()
1212 } else {
1213 PbCdcTableSnapshotSplitsWithGeneration {
1214 splits: actor_cdc_table_snapshot_splits,
1215 generation: control_stream_manager
1216 .env
1217 .cdc_table_backfill_tracker
1218 .next_generation(cdc_table_ids.into_iter()),
1219 }
1220 .into()
1221 };
1222 let mutation = Mutation::Update(UpdateMutation {
1223 dispatcher_update,
1224 merge_update,
1225 actor_vnode_bitmap_update,
1226 dropped_actors,
1227 actor_splits,
1228 actor_new_dispatchers,
1229 actor_cdc_table_snapshot_splits,
1230 sink_add_columns: Default::default(),
1231 });
1232 tracing::debug!("update mutation: {mutation:?}");
1233 Some(mutation)
1234 }
1235
1236 Command::CreateSubscription {
1237 upstream_mv_table_id,
1238 subscription_id,
1239 ..
1240 } => Some(Mutation::Add(AddMutation {
1241 actor_dispatchers: Default::default(),
1242 added_actors: vec![],
1243 actor_splits: Default::default(),
1244 pause: false,
1245 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1246 upstream_mv_table_id: upstream_mv_table_id.table_id,
1247 subscriber_id: *subscription_id,
1248 }],
1249 backfill_nodes_to_pause: vec![],
1250 actor_cdc_table_snapshot_splits: Default::default(),
1251 new_upstream_sinks: Default::default(),
1252 })),
1253 Command::DropSubscription {
1254 upstream_mv_table_id,
1255 subscription_id,
1256 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1257 info: vec![SubscriptionUpstreamInfo {
1258 subscriber_id: *subscription_id,
1259 upstream_mv_table_id: upstream_mv_table_id.table_id,
1260 }],
1261 })),
1262 Command::ConnectorPropsChange(config) => {
1263 let mut connector_props_infos = HashMap::default();
1264 for (k, v) in config {
1265 connector_props_infos.insert(
1266 *k,
1267 ConnectorPropsInfo {
1268 connector_props_info: v.clone(),
1269 },
1270 );
1271 }
1272 Some(Mutation::ConnectorPropsChange(
1273 ConnectorPropsChangeMutation {
1274 connector_props_infos,
1275 },
1276 ))
1277 }
1278 Command::StartFragmentBackfill { fragment_ids } => Some(
1279 Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1280 fragment_ids: fragment_ids.clone(),
1281 }),
1282 ),
1283 Command::Refresh {
1284 table_id,
1285 associated_source_id,
1286 } => Some(Mutation::RefreshStart(
1287 risingwave_pb::stream_plan::RefreshStartMutation {
1288 table_id: table_id.table_id,
1289 associated_source_id: associated_source_id.table_id,
1290 },
1291 )),
1292 Command::LoadFinish {
1293 table_id: _,
1294 associated_source_id,
1295 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1296 associated_source_id: associated_source_id.table_id,
1297 })),
1298 }
1299 }
1300
1301 pub(super) fn actors_to_create(
1302 &self,
1303 graph_info: &InflightDatabaseInfo,
1304 edges: &mut Option<FragmentEdgeBuildResult>,
1305 control_stream_manager: &ControlStreamManager,
1306 ) -> Option<StreamJobActorsToCreate> {
1307 match self {
1308 Command::CreateStreamingJob { info, job_type, .. } => {
1309 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1310 return None;
1312 }
1313 let actors_to_create = info.stream_job_fragments.actors_to_create();
1314 let edges = edges.as_mut().expect("should exist");
1315 Some(edges.collect_actors_to_create(actors_to_create))
1316 }
1317 Command::RescheduleFragment {
1318 reschedules,
1319 fragment_actors,
1320 ..
1321 } => {
1322 let mut actor_upstreams = Self::collect_actor_upstreams(
1323 reschedules.iter().map(|(fragment_id, reschedule)| {
1324 (
1325 *fragment_id,
1326 reschedule.newly_created_actors.values().map(
1327 |((actor, dispatchers), _)| {
1328 (actor.actor_id, dispatchers.as_slice())
1329 },
1330 ),
1331 )
1332 }),
1333 Some((reschedules, fragment_actors)),
1334 graph_info,
1335 control_stream_manager,
1336 );
1337 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1338 for (fragment_id, (actor, dispatchers), worker_id) in
1339 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1340 reschedule
1341 .newly_created_actors
1342 .values()
1343 .map(|(actors, status)| (*fragment_id, actors, status))
1344 })
1345 {
1346 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1347 map.entry(*worker_id)
1348 .or_default()
1349 .entry(fragment_id)
1350 .or_insert_with(|| {
1351 let node = graph_info.fragment(fragment_id).nodes.clone();
1352 (node, vec![])
1353 })
1354 .1
1355 .push((actor.clone(), upstreams, dispatchers.clone()));
1356 }
1357 Some(map)
1358 }
1359 Command::ReplaceStreamJob(replace_table) => {
1360 let edges = edges.as_mut().expect("should exist");
1361 let mut actors =
1362 edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create());
1363 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1364 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1365 (
1366 sink.new_fragment.fragment_id,
1367 &sink.new_fragment.nodes,
1368 sink.new_fragment.actors.iter().map(|actor| {
1369 (
1370 actor,
1371 sink.actor_status[&actor.actor_id]
1372 .location
1373 .as_ref()
1374 .unwrap()
1375 .worker_node_id as _,
1376 )
1377 }),
1378 )
1379 }));
1380 for (worker_id, fragment_actors) in sink_actors {
1381 actors.entry(worker_id).or_default().extend(fragment_actors);
1382 }
1383 }
1384 Some(actors)
1385 }
1386 _ => None,
1387 }
1388 }
1389
1390 fn generate_update_mutation_for_replace_table(
1391 dropped_actors: impl IntoIterator<Item = ActorId>,
1392 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1393 dispatchers: FragmentActorDispatchers,
1394 init_split_assignment: &SplitAssignment,
1395 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
1396 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1397 ) -> Option<Mutation> {
1398 let dropped_actors = dropped_actors.into_iter().collect();
1399
1400 let actor_new_dispatchers = dispatchers
1401 .into_values()
1402 .flatten()
1403 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1404 .collect();
1405
1406 let actor_splits = init_split_assignment
1407 .values()
1408 .flat_map(build_actor_connector_splits)
1409 .collect();
1410 Some(Mutation::Update(UpdateMutation {
1411 actor_new_dispatchers,
1412 merge_update: merge_updates.into_values().flatten().collect(),
1413 dropped_actors,
1414 actor_splits,
1415 actor_cdc_table_snapshot_splits:
1416 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1417 cdc_table_snapshot_split_assignment,
1418 )
1419 .into(),
1420 sink_add_columns: auto_refresh_schema_sinks
1421 .as_ref()
1422 .into_iter()
1423 .flat_map(|sinks| {
1424 sinks.iter().map(|sink| {
1425 (
1426 sink.original_sink.id,
1427 PbSinkAddColumns {
1428 fields: sink
1429 .newly_add_fields
1430 .iter()
1431 .map(|field| field.to_prost())
1432 .collect(),
1433 },
1434 )
1435 })
1436 })
1437 .collect(),
1438 ..Default::default()
1439 }))
1440 }
1441
1442 pub fn jobs_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1444 match self {
1445 Command::DropStreamingJobs {
1446 streaming_job_ids, ..
1447 } => Some(streaming_job_ids.iter().cloned()),
1448 _ => None,
1449 }
1450 .into_iter()
1451 .flatten()
1452 }
1453}
1454
1455impl Command {
1456 #[expect(clippy::type_complexity)]
1457 pub(super) fn collect_actor_upstreams(
1458 actor_dispatchers: impl Iterator<
1459 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1460 >,
1461 reschedule_dispatcher_update: Option<(
1462 &HashMap<FragmentId, Reschedule>,
1463 &HashMap<FragmentId, HashSet<ActorId>>,
1464 )>,
1465 graph_info: &InflightDatabaseInfo,
1466 control_stream_manager: &ControlStreamManager,
1467 ) -> HashMap<ActorId, ActorUpstreams> {
1468 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1469 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1470 let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1471 for (upstream_actor_id, dispatchers) in upstream_actors {
1472 let upstream_actor_location =
1473 upstream_fragment.actors[&upstream_actor_id].worker_id;
1474 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1475 for downstream_actor_id in dispatchers
1476 .iter()
1477 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1478 {
1479 actor_upstreams
1480 .entry(*downstream_actor_id)
1481 .or_default()
1482 .entry(upstream_fragment_id)
1483 .or_default()
1484 .insert(
1485 upstream_actor_id,
1486 PbActorInfo {
1487 actor_id: upstream_actor_id,
1488 host: Some(upstream_actor_host.clone()),
1489 },
1490 );
1491 }
1492 }
1493 }
1494 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1495 for reschedule in reschedules.values() {
1496 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1497 let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1498 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1499 for upstream_actor_id in fragment_actors
1500 .get(upstream_fragment_id)
1501 .expect("should exist")
1502 {
1503 let upstream_actor_location =
1504 upstream_fragment.actors[upstream_actor_id].worker_id;
1505 let upstream_actor_host =
1506 control_stream_manager.host_addr(upstream_actor_location);
1507 if let Some(upstream_reschedule) = upstream_reschedule
1508 && upstream_reschedule
1509 .removed_actors
1510 .contains(upstream_actor_id)
1511 {
1512 continue;
1513 }
1514 for (_, downstream_actor_id) in
1515 reschedule
1516 .added_actors
1517 .iter()
1518 .flat_map(|(worker_id, actors)| {
1519 actors.iter().map(|actor| (*worker_id, *actor))
1520 })
1521 {
1522 actor_upstreams
1523 .entry(downstream_actor_id)
1524 .or_default()
1525 .entry(*upstream_fragment_id)
1526 .or_default()
1527 .insert(
1528 *upstream_actor_id,
1529 PbActorInfo {
1530 actor_id: *upstream_actor_id,
1531 host: Some(upstream_actor_host.clone()),
1532 },
1533 );
1534 }
1535 }
1536 }
1537 }
1538 }
1539 actor_upstreams
1540 }
1541}