1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18use std::iter;
19
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::TableId;
23use risingwave_common::hash::ActorMapping;
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::SplitImpl;
29use risingwave_connector::source::cdc::{
30 CdcTableSnapshotSplitAssignment, CdcTableSnapshotSplitAssignmentWithGeneration,
31 build_pb_actor_cdc_table_snapshot_splits,
32 build_pb_actor_cdc_table_snapshot_splits_with_generation,
33};
34use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
35use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::catalog::CreateType;
38use risingwave_pb::catalog::table::PbTableType;
39use risingwave_pb::common::PbActorInfo;
40use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
41use risingwave_pb::source::{
42 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
43};
44use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
45use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
46use risingwave_pb::stream_plan::barrier_mutation::Mutation;
47use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
48use risingwave_pb::stream_plan::stream_node::NodeBody;
49use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
50use risingwave_pb::stream_plan::update_mutation::*;
51use risingwave_pb::stream_plan::{
52 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
53 LoadFinishMutation, PauseMutation, PbSinkAddColumns, PbUpstreamSinkInfo, ResumeMutation,
54 SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
55 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
56};
57use risingwave_pb::stream_service::BarrierCompleteResponse;
58use tracing::warn;
59
60use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
61use crate::barrier::InflightSubscriptionInfo;
62use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
63use crate::barrier::edge_builder::FragmentEdgeBuildResult;
64use crate::barrier::info::BarrierInfo;
65use crate::barrier::rpc::ControlStreamManager;
66use crate::barrier::utils::collect_resp_info;
67use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
68use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
69use crate::manager::{StreamingJob, StreamingJobType};
70use crate::model::{
71 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
72 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
73 StreamJobFragments, StreamJobFragmentsToCreate,
74};
75use crate::stream::{
76 AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder,
77 JobReschedulePostUpdates, SplitAssignment, SplitState, ThrottleConfig, UpstreamSinkInfo,
78 build_actor_connector_splits,
79};
80
81#[derive(Debug, Clone)]
84pub struct Reschedule {
85 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
87
88 pub removed_actors: HashSet<ActorId>,
90
91 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
93
94 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
96 pub upstream_dispatcher_mapping: Option<ActorMapping>,
101
102 pub downstream_fragment_ids: Vec<FragmentId>,
104
105 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
109
110 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
111
112 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
113
114 pub cdc_table_id: Option<u32>,
115}
116
117#[derive(Debug, Clone)]
124pub struct ReplaceStreamJobPlan {
125 pub old_fragments: StreamJobFragments,
126 pub new_fragments: StreamJobFragmentsToCreate,
127 pub replace_upstream: FragmentReplaceUpstream,
130 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
131 pub init_split_assignment: SplitAssignment,
137 pub streaming_job: StreamingJob,
139 pub tmp_id: u32,
141 pub to_drop_state_table_ids: Vec<TableId>,
143 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
144 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
145}
146
147impl ReplaceStreamJobPlan {
148 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
149 let mut fragment_changes = HashMap::new();
150 for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
151 let fragment_change = CommandFragmentChanges::NewFragment {
152 job_id: self.streaming_job.id().into(),
153 info: new_fragment,
154 is_existing: false,
155 };
156 fragment_changes
157 .try_insert(fragment_id, fragment_change)
158 .expect("non-duplicate");
159 }
160 for fragment in self.old_fragments.fragments.values() {
161 fragment_changes
162 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
163 .expect("non-duplicate");
164 }
165 for (fragment_id, replace_map) in &self.replace_upstream {
166 fragment_changes
167 .try_insert(
168 *fragment_id,
169 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
170 )
171 .expect("non-duplicate");
172 }
173 if let Some(sinks) = &self.auto_refresh_schema_sinks {
174 for sink in sinks {
175 let fragment_change = CommandFragmentChanges::NewFragment {
176 job_id: TableId::new(sink.original_sink.id as _),
177 info: sink.new_fragment_info(),
178 is_existing: false,
179 };
180 fragment_changes
181 .try_insert(sink.new_fragment.fragment_id, fragment_change)
182 .expect("non-duplicate");
183 fragment_changes
184 .try_insert(
185 sink.original_fragment.fragment_id,
186 CommandFragmentChanges::RemoveFragment,
187 )
188 .expect("non-duplicate");
189 }
190 }
191 fragment_changes
192 }
193
194 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
196 let mut fragment_replacements = HashMap::new();
197 for (upstream_fragment_id, new_upstream_fragment_id) in
198 self.replace_upstream.values().flatten()
199 {
200 {
201 let r =
202 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
203 if let Some(r) = r {
204 assert_eq!(
205 *new_upstream_fragment_id, r,
206 "one fragment is replaced by multiple fragments"
207 );
208 }
209 }
210 }
211 fragment_replacements
212 }
213}
214
215#[derive(educe::Educe, Clone)]
216#[educe(Debug)]
217pub struct CreateStreamingJobCommandInfo {
218 #[educe(Debug(ignore))]
219 pub stream_job_fragments: StreamJobFragmentsToCreate,
220 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
221 pub init_split_assignment: SplitAssignment,
222 pub definition: String,
223 pub job_type: StreamingJobType,
224 pub create_type: CreateType,
225 pub streaming_job: StreamingJob,
226 pub fragment_backfill_ordering: FragmentBackfillOrder,
227 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
228}
229
230impl StreamJobFragments {
231 pub(super) fn new_fragment_info(
232 &self,
233 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
234 self.fragments.values().map(|fragment| {
235 (
236 fragment.fragment_id,
237 InflightFragmentInfo {
238 fragment_id: fragment.fragment_id,
239 distribution_type: fragment.distribution_type.into(),
240 nodes: fragment.nodes.clone(),
241 actors: fragment
242 .actors
243 .iter()
244 .map(|actor| {
245 (
246 actor.actor_id,
247 InflightActorInfo {
248 worker_id: self
249 .actor_status
250 .get(&actor.actor_id)
251 .expect("should exist")
252 .worker_id()
253 as WorkerId,
254 vnode_bitmap: actor.vnode_bitmap.clone(),
255 },
256 )
257 })
258 .collect(),
259 state_table_ids: fragment
260 .state_table_ids
261 .iter()
262 .map(|table_id| TableId::new(*table_id))
263 .collect(),
264 },
265 )
266 })
267 }
268}
269
270#[derive(Debug, Clone)]
271pub struct SnapshotBackfillInfo {
272 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
276}
277
278#[derive(Debug, Clone)]
279pub enum CreateStreamingJobType {
280 Normal,
281 SinkIntoTable(UpstreamSinkInfo),
282 SnapshotBackfill(SnapshotBackfillInfo),
283}
284
285#[derive(Debug)]
290pub enum Command {
291 Flush,
294
295 Pause,
298
299 Resume,
303
304 DropStreamingJobs {
312 streaming_job_ids: HashSet<TableId>,
313 actors: Vec<ActorId>,
314 unregistered_state_table_ids: HashSet<TableId>,
315 unregistered_fragment_ids: HashSet<FragmentId>,
316 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
318 },
319
320 CreateStreamingJob {
330 info: CreateStreamingJobCommandInfo,
331 job_type: CreateStreamingJobType,
332 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
333 },
334 MergeSnapshotBackfillStreamingJobs(
335 HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
336 ),
337
338 RescheduleFragment {
344 reschedules: HashMap<FragmentId, Reschedule>,
345 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
347 post_updates: JobReschedulePostUpdates,
349 },
350
351 ReplaceStreamJob(ReplaceStreamJobPlan),
358
359 SourceChangeSplit(SplitState),
362
363 Throttle(ThrottleConfig),
366
367 CreateSubscription {
370 subscription_id: u32,
371 upstream_mv_table_id: TableId,
372 retention_second: u64,
373 },
374
375 DropSubscription {
379 subscription_id: u32,
380 upstream_mv_table_id: TableId,
381 },
382
383 ConnectorPropsChange(ConnectorPropsChange),
384
385 StartFragmentBackfill {
387 fragment_ids: Vec<FragmentId>,
388 },
389
390 Refresh {
393 table_id: TableId,
394 associated_source_id: TableId,
395 },
396 LoadFinish {
397 table_id: TableId,
398 associated_source_id: TableId,
399 },
400}
401
402impl std::fmt::Display for Command {
404 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
405 match self {
406 Command::Flush => write!(f, "Flush"),
407 Command::Pause => write!(f, "Pause"),
408 Command::Resume => write!(f, "Resume"),
409 Command::DropStreamingJobs {
410 streaming_job_ids, ..
411 } => {
412 write!(
413 f,
414 "DropStreamingJobs: {}",
415 streaming_job_ids.iter().sorted().join(", ")
416 )
417 }
418 Command::CreateStreamingJob { info, .. } => {
419 write!(f, "CreateStreamingJob: {}", info.streaming_job)
420 }
421 Command::MergeSnapshotBackfillStreamingJobs(_) => {
422 write!(f, "MergeSnapshotBackfillStreamingJobs")
423 }
424 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
425 Command::ReplaceStreamJob(plan) => {
426 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
427 }
428 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
429 Command::Throttle(_) => write!(f, "Throttle"),
430 Command::CreateSubscription {
431 subscription_id, ..
432 } => write!(f, "CreateSubscription: {subscription_id}"),
433 Command::DropSubscription {
434 subscription_id, ..
435 } => write!(f, "DropSubscription: {subscription_id}"),
436 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
437 Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
438 Command::Refresh {
439 table_id,
440 associated_source_id,
441 } => write!(
442 f,
443 "Refresh: {} (source: {})",
444 table_id, associated_source_id
445 ),
446 Command::LoadFinish {
447 table_id,
448 associated_source_id,
449 } => write!(
450 f,
451 "LoadFinish: {} (source: {})",
452 table_id, associated_source_id
453 ),
454 }
455 }
456}
457
458impl Command {
459 pub fn pause() -> Self {
460 Self::Pause
461 }
462
463 pub fn resume() -> Self {
464 Self::Resume
465 }
466
467 pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
468 match self {
469 Command::Flush => None,
470 Command::Pause => None,
471 Command::Resume => None,
472 Command::DropStreamingJobs {
473 unregistered_fragment_ids,
474 dropped_sink_fragment_by_targets,
475 ..
476 } => {
477 let changes = unregistered_fragment_ids
478 .iter()
479 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
480 .chain(dropped_sink_fragment_by_targets.iter().map(
481 |(target_fragment, sink_fragments)| {
482 (
483 *target_fragment,
484 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
485 )
486 },
487 ))
488 .collect();
489
490 Some(changes)
491 }
492 Command::CreateStreamingJob { info, job_type, .. } => {
493 assert!(
494 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
495 "should handle fragment changes separately for snapshot backfill"
496 );
497 let mut changes: HashMap<_, _> = info
498 .stream_job_fragments
499 .new_fragment_info()
500 .map(|(fragment_id, fragment_info)| {
501 (
502 fragment_id,
503 CommandFragmentChanges::NewFragment {
504 job_id: info.streaming_job.id().into(),
505 info: fragment_info,
506 is_existing: false,
507 },
508 )
509 })
510 .collect();
511
512 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
513 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
514 changes.insert(
515 downstream_fragment_id,
516 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
517 upstream_fragment_id: ctx.sink_fragment_id,
518 sink_output_schema: ctx.sink_output_fields.clone(),
519 project_exprs: ctx.project_exprs.clone(),
520 }),
521 );
522 }
523
524 Some(changes)
525 }
526 Command::RescheduleFragment { reschedules, .. } => Some(
527 reschedules
528 .iter()
529 .map(|(fragment_id, reschedule)| {
530 (
531 *fragment_id,
532 CommandFragmentChanges::Reschedule {
533 new_actors: reschedule
534 .added_actors
535 .iter()
536 .flat_map(|(node_id, actors)| {
537 actors.iter().map(|actor_id| {
538 (
539 *actor_id,
540 InflightActorInfo {
541 worker_id: *node_id,
542 vnode_bitmap: reschedule
543 .newly_created_actors
544 .get(actor_id)
545 .expect("should exist")
546 .0
547 .0
548 .vnode_bitmap
549 .clone(),
550 },
551 )
552 })
553 })
554 .collect(),
555 actor_update_vnode_bitmap: reschedule
556 .vnode_bitmap_updates
557 .iter()
558 .filter(|(actor_id, _)| {
559 !reschedule.newly_created_actors.contains_key(actor_id)
561 })
562 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
563 .collect(),
564 to_remove: reschedule.removed_actors.iter().cloned().collect(),
565 },
566 )
567 })
568 .collect(),
569 ),
570 Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
571 Command::MergeSnapshotBackfillStreamingJobs(_) => None,
572 Command::SourceChangeSplit { .. } => None,
573 Command::Throttle(_) => None,
574 Command::CreateSubscription { .. } => None,
575 Command::DropSubscription { .. } => None,
576 Command::ConnectorPropsChange(_) => None,
577 Command::StartFragmentBackfill { .. } => None,
578 Command::Refresh { .. } => None, Command::LoadFinish { .. } => None, }
581 }
582
583 pub fn need_checkpoint(&self) -> bool {
584 !matches!(self, Command::Resume)
586 }
587}
588
589#[derive(Debug, Clone)]
590pub enum BarrierKind {
591 Initial,
592 Barrier,
593 Checkpoint(Vec<u64>),
595}
596
597impl BarrierKind {
598 pub fn to_protobuf(&self) -> PbBarrierKind {
599 match self {
600 BarrierKind::Initial => PbBarrierKind::Initial,
601 BarrierKind::Barrier => PbBarrierKind::Barrier,
602 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
603 }
604 }
605
606 pub fn is_checkpoint(&self) -> bool {
607 matches!(self, BarrierKind::Checkpoint(_))
608 }
609
610 pub fn is_initial(&self) -> bool {
611 matches!(self, BarrierKind::Initial)
612 }
613
614 pub fn as_str_name(&self) -> &'static str {
615 match self {
616 BarrierKind::Initial => "Initial",
617 BarrierKind::Barrier => "Barrier",
618 BarrierKind::Checkpoint(_) => "Checkpoint",
619 }
620 }
621}
622
623pub(super) struct CommandContext {
626 subscription_info: InflightSubscriptionInfo,
627
628 pub(super) barrier_info: BarrierInfo,
629
630 pub(super) table_ids_to_commit: HashSet<TableId>,
631
632 pub(super) command: Option<Command>,
633
634 _span: tracing::Span,
640}
641
642impl std::fmt::Debug for CommandContext {
643 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
644 f.debug_struct("CommandContext")
645 .field("barrier_info", &self.barrier_info)
646 .field("command", &self.command)
647 .finish()
648 }
649}
650
651impl std::fmt::Display for CommandContext {
652 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
653 write!(
654 f,
655 "prev_epoch={}, curr_epoch={}, kind={}",
656 self.barrier_info.prev_epoch.value().0,
657 self.barrier_info.curr_epoch.value().0,
658 self.barrier_info.kind.as_str_name()
659 )?;
660 if let Some(command) = &self.command {
661 write!(f, ", command={}", command)?;
662 }
663 Ok(())
664 }
665}
666
667impl CommandContext {
668 pub(super) fn new(
669 barrier_info: BarrierInfo,
670 subscription_info: InflightSubscriptionInfo,
671 table_ids_to_commit: HashSet<TableId>,
672 command: Option<Command>,
673 span: tracing::Span,
674 ) -> Self {
675 Self {
676 subscription_info,
677 barrier_info,
678 table_ids_to_commit,
679 command,
680 _span: span,
681 }
682 }
683
684 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
685 let Some(truncate_timestamptz) = Timestamptz::from_secs(
686 self.barrier_info
687 .prev_epoch
688 .value()
689 .as_timestamptz()
690 .timestamp()
691 - retention_second as i64,
692 ) else {
693 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
694 return self.barrier_info.prev_epoch.value();
695 };
696 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
697 }
698
699 pub(super) fn collect_commit_epoch_info(
700 &self,
701 info: &mut CommitEpochInfo,
702 resps: Vec<BarrierCompleteResponse>,
703 backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
704 ) {
705 let (
706 sst_to_context,
707 synced_ssts,
708 new_table_watermarks,
709 old_value_ssts,
710 vector_index_adds,
711 truncate_tables,
712 ) = collect_resp_info(resps);
713
714 let new_table_fragment_infos =
715 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
716 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
717 {
718 let table_fragments = &info.stream_job_fragments;
719 let mut table_ids: HashSet<_> = table_fragments
720 .internal_table_ids()
721 .into_iter()
722 .map(TableId::new)
723 .collect();
724 if let Some(mv_table_id) = table_fragments.mv_table_id() {
725 table_ids.insert(TableId::new(mv_table_id));
726 }
727
728 vec![NewTableFragmentInfo { table_ids }]
729 } else {
730 vec![]
731 };
732
733 let mut mv_log_store_truncate_epoch = HashMap::new();
734 let mut update_truncate_epoch =
736 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
737 .entry(table_id.table_id)
738 {
739 Entry::Occupied(mut entry) => {
740 let prev_truncate_epoch = entry.get_mut();
741 if truncate_epoch < *prev_truncate_epoch {
742 *prev_truncate_epoch = truncate_epoch;
743 }
744 }
745 Entry::Vacant(entry) => {
746 entry.insert(truncate_epoch);
747 }
748 };
749 for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
750 if let Some(truncate_epoch) = subscriptions
751 .values()
752 .max()
753 .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
754 {
755 update_truncate_epoch(*mv_table_id, truncate_epoch);
756 }
757 }
758 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
759 for mv_table_id in upstream_mv_table_ids {
760 update_truncate_epoch(mv_table_id, backfill_epoch);
761 }
762 }
763
764 let table_new_change_log = build_table_change_log_delta(
765 old_value_ssts.into_iter(),
766 synced_ssts.iter().map(|sst| &sst.sst_info),
767 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
768 mv_log_store_truncate_epoch.into_iter(),
769 );
770
771 let epoch = self.barrier_info.prev_epoch();
772 for table_id in &self.table_ids_to_commit {
773 info.tables_to_commit
774 .try_insert(*table_id, epoch)
775 .expect("non duplicate");
776 }
777
778 info.sstables.extend(synced_ssts);
779 info.new_table_watermarks.extend(new_table_watermarks);
780 info.sst_to_context.extend(sst_to_context);
781 info.new_table_fragment_infos
782 .extend(new_table_fragment_infos);
783 info.change_log_delta.extend(table_new_change_log);
784 for (table_id, vector_index_adds) in vector_index_adds {
785 info.vector_index_delta
786 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
787 .expect("non-duplicate");
788 }
789 if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
790 for fragment in job_info.stream_job_fragments.fragments.values() {
791 visit_stream_node_cont(&fragment.nodes, |node| {
792 match node.node_body.as_ref().unwrap() {
793 NodeBody::VectorIndexWrite(vector_index_write) => {
794 let index_table = vector_index_write.table.as_ref().unwrap();
795 assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
796 info.vector_index_delta
797 .try_insert(
798 index_table.id.into(),
799 VectorIndexDelta::Init(PbVectorIndexInit {
800 info: Some(index_table.vector_index_info.unwrap()),
801 }),
802 )
803 .expect("non-duplicate");
804 false
805 }
806 _ => true,
807 }
808 })
809 }
810 }
811 info.truncate_tables.extend(truncate_tables);
812 }
813}
814
815impl Command {
816 pub(super) fn to_mutation(
820 &self,
821 is_currently_paused: bool,
822 edges: &mut Option<FragmentEdgeBuildResult>,
823 control_stream_manager: &ControlStreamManager,
824 ) -> Option<Mutation> {
825 match self {
826 Command::Flush => None,
827
828 Command::Pause => {
829 if !is_currently_paused {
832 Some(Mutation::Pause(PauseMutation {}))
833 } else {
834 None
835 }
836 }
837
838 Command::Resume => {
839 if is_currently_paused {
841 Some(Mutation::Resume(ResumeMutation {}))
842 } else {
843 None
844 }
845 }
846
847 Command::SourceChangeSplit(SplitState {
848 split_assignment, ..
849 }) => {
850 let mut diff = HashMap::new();
851
852 for actor_splits in split_assignment.values() {
853 diff.extend(actor_splits.clone());
854 }
855
856 Some(Mutation::Splits(SourceChangeSplitMutation {
857 actor_splits: build_actor_connector_splits(&diff),
858 }))
859 }
860
861 Command::Throttle(config) => {
862 let mut actor_to_apply = HashMap::new();
863 for per_fragment in config.values() {
864 actor_to_apply.extend(
865 per_fragment
866 .iter()
867 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
868 );
869 }
870
871 Some(Mutation::Throttle(ThrottleMutation {
872 actor_throttle: actor_to_apply,
873 }))
874 }
875
876 Command::DropStreamingJobs {
877 actors,
878 dropped_sink_fragment_by_targets,
879 ..
880 } => Some(Mutation::Stop(StopMutation {
881 actors: actors.clone(),
882 dropped_sink_fragments: dropped_sink_fragment_by_targets
883 .values()
884 .flatten()
885 .cloned()
886 .collect(),
887 })),
888
889 Command::CreateStreamingJob {
890 info:
891 CreateStreamingJobCommandInfo {
892 stream_job_fragments: table_fragments,
893 init_split_assignment: split_assignment,
894 upstream_fragment_downstreams,
895 fragment_backfill_ordering,
896 cdc_table_snapshot_split_assignment,
897 ..
898 },
899 job_type,
900 ..
901 } => {
902 let edges = edges.as_mut().expect("should exist");
903 let added_actors = table_fragments.actor_ids();
904 let actor_splits = split_assignment
905 .values()
906 .flat_map(build_actor_connector_splits)
907 .collect();
908 let subscriptions_to_add =
909 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
910 job_type
911 {
912 snapshot_backfill_info
913 .upstream_mv_table_id_to_backfill_epoch
914 .keys()
915 .map(|table_id| SubscriptionUpstreamInfo {
916 subscriber_id: table_fragments.stream_job_id().table_id,
917 upstream_mv_table_id: table_id.table_id,
918 })
919 .collect()
920 } else {
921 Default::default()
922 };
923 let backfill_nodes_to_pause: Vec<_> =
924 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
925 .into_iter()
926 .collect();
927
928 let new_upstream_sinks =
929 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
930 sink_fragment_id,
931 sink_output_fields,
932 project_exprs,
933 new_sink_downstream,
934 ..
935 }) = job_type
936 {
937 let new_sink_actors = table_fragments
938 .actors_to_create()
939 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
940 .exactly_one()
941 .map(|(_, _, actors)| {
942 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
943 actor_id: actor.actor_id,
944 host: Some(control_stream_manager.host_addr(worker_id)),
945 })
946 })
947 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
948 let new_upstream_sink = PbNewUpstreamSink {
949 info: Some(PbUpstreamSinkInfo {
950 upstream_fragment_id: *sink_fragment_id,
951 sink_output_schema: sink_output_fields.clone(),
952 project_exprs: project_exprs.clone(),
953 }),
954 upstream_actors: new_sink_actors.collect(),
955 };
956 HashMap::from([(
957 new_sink_downstream.downstream_fragment_id,
958 new_upstream_sink.clone(),
959 )])
960 } else {
961 HashMap::new()
962 };
963
964 let add_mutation = AddMutation {
965 actor_dispatchers: edges
966 .dispatchers
967 .extract_if(|fragment_id, _| {
968 upstream_fragment_downstreams.contains_key(fragment_id)
969 })
970 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
971 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
972 .collect(),
973 added_actors,
974 actor_splits,
975 pause: is_currently_paused,
977 subscriptions_to_add,
978 backfill_nodes_to_pause,
979 actor_cdc_table_snapshot_splits:
980 build_pb_actor_cdc_table_snapshot_splits_with_generation(
981 cdc_table_snapshot_split_assignment.clone(),
982 )
983 .into(),
984 new_upstream_sinks,
985 };
986
987 Some(Mutation::Add(add_mutation))
988 }
989 Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
990 Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
991 info: jobs_to_merge
992 .iter()
993 .flat_map(|(table_id, (backfill_upstream_tables, _))| {
994 backfill_upstream_tables
995 .iter()
996 .map(move |upstream_table_id| SubscriptionUpstreamInfo {
997 subscriber_id: table_id.table_id,
998 upstream_mv_table_id: upstream_table_id.table_id,
999 })
1000 })
1001 .collect(),
1002 }))
1003 }
1004
1005 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1006 old_fragments,
1007 replace_upstream,
1008 upstream_fragment_downstreams,
1009 init_split_assignment,
1010 auto_refresh_schema_sinks,
1011 cdc_table_snapshot_split_assignment,
1012 ..
1013 }) => {
1014 let edges = edges.as_mut().expect("should exist");
1015 let merge_updates = edges
1016 .merge_updates
1017 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1018 .collect();
1019 let dispatchers = edges
1020 .dispatchers
1021 .extract_if(|fragment_id, _| {
1022 upstream_fragment_downstreams.contains_key(fragment_id)
1023 })
1024 .collect();
1025 let cdc_table_snapshot_split_assignment =
1026 if cdc_table_snapshot_split_assignment.is_empty() {
1027 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
1028 } else {
1029 CdcTableSnapshotSplitAssignmentWithGeneration::new(
1030 cdc_table_snapshot_split_assignment.clone(),
1031 control_stream_manager
1032 .env
1033 .cdc_table_backfill_tracker
1034 .next_generation(iter::once(old_fragments.stream_job_id.table_id)),
1035 )
1036 };
1037 Self::generate_update_mutation_for_replace_table(
1038 old_fragments.actor_ids().into_iter().chain(
1039 auto_refresh_schema_sinks
1040 .as_ref()
1041 .into_iter()
1042 .flat_map(|sinks| {
1043 sinks.iter().flat_map(|sink| {
1044 sink.original_fragment
1045 .actors
1046 .iter()
1047 .map(|actor| actor.actor_id)
1048 })
1049 }),
1050 ),
1051 merge_updates,
1052 dispatchers,
1053 init_split_assignment,
1054 cdc_table_snapshot_split_assignment,
1055 auto_refresh_schema_sinks.as_ref(),
1056 )
1057 }
1058
1059 Command::RescheduleFragment {
1060 reschedules,
1061 fragment_actors,
1062 ..
1063 } => {
1064 let mut dispatcher_update = HashMap::new();
1065 for reschedule in reschedules.values() {
1066 for &(upstream_fragment_id, dispatcher_id) in
1067 &reschedule.upstream_fragment_dispatcher_ids
1068 {
1069 let upstream_actor_ids = fragment_actors
1071 .get(&upstream_fragment_id)
1072 .expect("should contain");
1073
1074 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1075
1076 for &actor_id in upstream_actor_ids {
1078 let added_downstream_actor_id = if upstream_reschedule
1079 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1080 .unwrap_or(true)
1081 {
1082 reschedule
1083 .added_actors
1084 .values()
1085 .flatten()
1086 .cloned()
1087 .collect()
1088 } else {
1089 Default::default()
1090 };
1091 dispatcher_update
1093 .try_insert(
1094 (actor_id, dispatcher_id),
1095 DispatcherUpdate {
1096 actor_id,
1097 dispatcher_id,
1098 hash_mapping: reschedule
1099 .upstream_dispatcher_mapping
1100 .as_ref()
1101 .map(|m| m.to_protobuf()),
1102 added_downstream_actor_id,
1103 removed_downstream_actor_id: reschedule
1104 .removed_actors
1105 .iter()
1106 .cloned()
1107 .collect(),
1108 },
1109 )
1110 .unwrap();
1111 }
1112 }
1113 }
1114 let dispatcher_update = dispatcher_update.into_values().collect();
1115
1116 let mut merge_update = HashMap::new();
1117 for (&fragment_id, reschedule) in reschedules {
1118 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1119 let downstream_actor_ids = fragment_actors
1121 .get(&downstream_fragment_id)
1122 .expect("should contain");
1123
1124 let downstream_removed_actors: HashSet<_> = reschedules
1128 .get(&downstream_fragment_id)
1129 .map(|downstream_reschedule| {
1130 downstream_reschedule
1131 .removed_actors
1132 .iter()
1133 .copied()
1134 .collect()
1135 })
1136 .unwrap_or_default();
1137
1138 for &actor_id in downstream_actor_ids {
1140 if downstream_removed_actors.contains(&actor_id) {
1141 continue;
1142 }
1143
1144 merge_update
1146 .try_insert(
1147 (actor_id, fragment_id),
1148 MergeUpdate {
1149 actor_id,
1150 upstream_fragment_id: fragment_id,
1151 new_upstream_fragment_id: None,
1152 added_upstream_actors: reschedule
1153 .added_actors
1154 .iter()
1155 .flat_map(|(worker_id, actors)| {
1156 let host =
1157 control_stream_manager.host_addr(*worker_id);
1158 actors.iter().map(move |actor_id| PbActorInfo {
1159 actor_id: *actor_id,
1160 host: Some(host.clone()),
1161 })
1162 })
1163 .collect(),
1164 removed_upstream_actor_id: reschedule
1165 .removed_actors
1166 .iter()
1167 .cloned()
1168 .collect(),
1169 },
1170 )
1171 .unwrap();
1172 }
1173 }
1174 }
1175 let merge_update = merge_update.into_values().collect();
1176
1177 let mut actor_vnode_bitmap_update = HashMap::new();
1178 for reschedule in reschedules.values() {
1179 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1181 let bitmap = bitmap.to_protobuf();
1182 actor_vnode_bitmap_update
1183 .try_insert(actor_id, bitmap)
1184 .unwrap();
1185 }
1186 }
1187 let dropped_actors = reschedules
1188 .values()
1189 .flat_map(|r| r.removed_actors.iter().copied())
1190 .collect();
1191 let mut actor_splits = HashMap::new();
1192 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1193 let mut cdc_table_ids: HashSet<_> = HashSet::default();
1194 for reschedule in reschedules.values() {
1195 for (actor_id, splits) in &reschedule.actor_splits {
1196 actor_splits.insert(
1197 *actor_id as ActorId,
1198 ConnectorSplits {
1199 splits: splits.iter().map(ConnectorSplit::from).collect(),
1200 },
1201 );
1202 }
1203 actor_cdc_table_snapshot_splits.extend(
1204 build_pb_actor_cdc_table_snapshot_splits(
1205 reschedule.cdc_table_snapshot_split_assignment.clone(),
1206 ),
1207 );
1208 if let Some(cdc_table_id) = reschedule.cdc_table_id {
1209 cdc_table_ids.insert(cdc_table_id);
1210 }
1211 }
1212
1213 let actor_new_dispatchers = HashMap::new();
1215 let actor_cdc_table_snapshot_splits = if actor_cdc_table_snapshot_splits.is_empty()
1216 {
1217 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1218 CdcTableSnapshotSplitAssignmentWithGeneration::empty(),
1219 )
1220 .into()
1221 } else {
1222 PbCdcTableSnapshotSplitsWithGeneration {
1223 splits: actor_cdc_table_snapshot_splits,
1224 generation: control_stream_manager
1225 .env
1226 .cdc_table_backfill_tracker
1227 .next_generation(cdc_table_ids.into_iter()),
1228 }
1229 .into()
1230 };
1231 let mutation = Mutation::Update(UpdateMutation {
1232 dispatcher_update,
1233 merge_update,
1234 actor_vnode_bitmap_update,
1235 dropped_actors,
1236 actor_splits,
1237 actor_new_dispatchers,
1238 actor_cdc_table_snapshot_splits,
1239 sink_add_columns: Default::default(),
1240 });
1241 tracing::debug!("update mutation: {mutation:?}");
1242 Some(mutation)
1243 }
1244
1245 Command::CreateSubscription {
1246 upstream_mv_table_id,
1247 subscription_id,
1248 ..
1249 } => Some(Mutation::Add(AddMutation {
1250 actor_dispatchers: Default::default(),
1251 added_actors: vec![],
1252 actor_splits: Default::default(),
1253 pause: false,
1254 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1255 upstream_mv_table_id: upstream_mv_table_id.table_id,
1256 subscriber_id: *subscription_id,
1257 }],
1258 backfill_nodes_to_pause: vec![],
1259 actor_cdc_table_snapshot_splits: Default::default(),
1260 new_upstream_sinks: Default::default(),
1261 })),
1262 Command::DropSubscription {
1263 upstream_mv_table_id,
1264 subscription_id,
1265 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1266 info: vec![SubscriptionUpstreamInfo {
1267 subscriber_id: *subscription_id,
1268 upstream_mv_table_id: upstream_mv_table_id.table_id,
1269 }],
1270 })),
1271 Command::ConnectorPropsChange(config) => {
1272 let mut connector_props_infos = HashMap::default();
1273 for (k, v) in config {
1274 connector_props_infos.insert(
1275 *k,
1276 ConnectorPropsInfo {
1277 connector_props_info: v.clone(),
1278 },
1279 );
1280 }
1281 Some(Mutation::ConnectorPropsChange(
1282 ConnectorPropsChangeMutation {
1283 connector_props_infos,
1284 },
1285 ))
1286 }
1287 Command::StartFragmentBackfill { fragment_ids } => Some(
1288 Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1289 fragment_ids: fragment_ids.clone(),
1290 }),
1291 ),
1292 Command::Refresh {
1293 table_id,
1294 associated_source_id,
1295 } => Some(Mutation::RefreshStart(
1296 risingwave_pb::stream_plan::RefreshStartMutation {
1297 table_id: table_id.table_id,
1298 associated_source_id: associated_source_id.table_id,
1299 },
1300 )),
1301 Command::LoadFinish {
1302 table_id: _,
1303 associated_source_id,
1304 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1305 associated_source_id: associated_source_id.table_id,
1306 })),
1307 }
1308 }
1309
1310 pub(super) fn actors_to_create(
1311 &self,
1312 graph_info: &InflightDatabaseInfo,
1313 edges: &mut Option<FragmentEdgeBuildResult>,
1314 control_stream_manager: &ControlStreamManager,
1315 ) -> Option<StreamJobActorsToCreate> {
1316 match self {
1317 Command::CreateStreamingJob { info, job_type, .. } => {
1318 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1319 return None;
1321 }
1322 let actors_to_create = info.stream_job_fragments.actors_to_create();
1323 let edges = edges.as_mut().expect("should exist");
1324 Some(edges.collect_actors_to_create(actors_to_create))
1325 }
1326 Command::RescheduleFragment {
1327 reschedules,
1328 fragment_actors,
1329 ..
1330 } => {
1331 let mut actor_upstreams = Self::collect_actor_upstreams(
1332 reschedules.iter().map(|(fragment_id, reschedule)| {
1333 (
1334 *fragment_id,
1335 reschedule.newly_created_actors.values().map(
1336 |((actor, dispatchers), _)| {
1337 (actor.actor_id, dispatchers.as_slice())
1338 },
1339 ),
1340 )
1341 }),
1342 Some((reschedules, fragment_actors)),
1343 graph_info,
1344 control_stream_manager,
1345 );
1346 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1347 for (fragment_id, (actor, dispatchers), worker_id) in
1348 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1349 reschedule
1350 .newly_created_actors
1351 .values()
1352 .map(|(actors, status)| (*fragment_id, actors, status))
1353 })
1354 {
1355 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1356 map.entry(*worker_id)
1357 .or_default()
1358 .entry(fragment_id)
1359 .or_insert_with(|| {
1360 let node = graph_info.fragment(fragment_id).nodes.clone();
1361 (node, vec![])
1362 })
1363 .1
1364 .push((actor.clone(), upstreams, dispatchers.clone()));
1365 }
1366 Some(map)
1367 }
1368 Command::ReplaceStreamJob(replace_table) => {
1369 let edges = edges.as_mut().expect("should exist");
1370 let mut actors =
1371 edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create());
1372 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1373 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1374 (
1375 sink.new_fragment.fragment_id,
1376 &sink.new_fragment.nodes,
1377 sink.new_fragment.actors.iter().map(|actor| {
1378 (
1379 actor,
1380 sink.actor_status[&actor.actor_id]
1381 .location
1382 .as_ref()
1383 .unwrap()
1384 .worker_node_id as _,
1385 )
1386 }),
1387 )
1388 }));
1389 for (worker_id, fragment_actors) in sink_actors {
1390 actors.entry(worker_id).or_default().extend(fragment_actors);
1391 }
1392 }
1393 Some(actors)
1394 }
1395 _ => None,
1396 }
1397 }
1398
1399 fn generate_update_mutation_for_replace_table(
1400 dropped_actors: impl IntoIterator<Item = ActorId>,
1401 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1402 dispatchers: FragmentActorDispatchers,
1403 init_split_assignment: &SplitAssignment,
1404 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
1405 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1406 ) -> Option<Mutation> {
1407 let dropped_actors = dropped_actors.into_iter().collect();
1408
1409 let actor_new_dispatchers = dispatchers
1410 .into_values()
1411 .flatten()
1412 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1413 .collect();
1414
1415 let actor_splits = init_split_assignment
1416 .values()
1417 .flat_map(build_actor_connector_splits)
1418 .collect();
1419 Some(Mutation::Update(UpdateMutation {
1420 actor_new_dispatchers,
1421 merge_update: merge_updates.into_values().flatten().collect(),
1422 dropped_actors,
1423 actor_splits,
1424 actor_cdc_table_snapshot_splits:
1425 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1426 cdc_table_snapshot_split_assignment,
1427 )
1428 .into(),
1429 sink_add_columns: auto_refresh_schema_sinks
1430 .as_ref()
1431 .into_iter()
1432 .flat_map(|sinks| {
1433 sinks.iter().map(|sink| {
1434 (
1435 sink.original_sink.id,
1436 PbSinkAddColumns {
1437 fields: sink
1438 .newly_add_fields
1439 .iter()
1440 .map(|field| field.to_prost())
1441 .collect(),
1442 },
1443 )
1444 })
1445 })
1446 .collect(),
1447 ..Default::default()
1448 }))
1449 }
1450
1451 pub fn jobs_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1453 match self {
1454 Command::DropStreamingJobs {
1455 streaming_job_ids, ..
1456 } => Some(streaming_job_ids.iter().cloned()),
1457 _ => None,
1458 }
1459 .into_iter()
1460 .flatten()
1461 }
1462}
1463
1464impl Command {
1465 #[expect(clippy::type_complexity)]
1466 pub(super) fn collect_actor_upstreams(
1467 actor_dispatchers: impl Iterator<
1468 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1469 >,
1470 reschedule_dispatcher_update: Option<(
1471 &HashMap<FragmentId, Reschedule>,
1472 &HashMap<FragmentId, HashSet<ActorId>>,
1473 )>,
1474 graph_info: &InflightDatabaseInfo,
1475 control_stream_manager: &ControlStreamManager,
1476 ) -> HashMap<ActorId, ActorUpstreams> {
1477 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1478 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1479 let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1480 for (upstream_actor_id, dispatchers) in upstream_actors {
1481 let upstream_actor_location =
1482 upstream_fragment.actors[&upstream_actor_id].worker_id;
1483 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1484 for downstream_actor_id in dispatchers
1485 .iter()
1486 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1487 {
1488 actor_upstreams
1489 .entry(*downstream_actor_id)
1490 .or_default()
1491 .entry(upstream_fragment_id)
1492 .or_default()
1493 .insert(
1494 upstream_actor_id,
1495 PbActorInfo {
1496 actor_id: upstream_actor_id,
1497 host: Some(upstream_actor_host.clone()),
1498 },
1499 );
1500 }
1501 }
1502 }
1503 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1504 for reschedule in reschedules.values() {
1505 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1506 let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1507 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1508 for upstream_actor_id in fragment_actors
1509 .get(upstream_fragment_id)
1510 .expect("should exist")
1511 {
1512 let upstream_actor_location =
1513 upstream_fragment.actors[upstream_actor_id].worker_id;
1514 let upstream_actor_host =
1515 control_stream_manager.host_addr(upstream_actor_location);
1516 if let Some(upstream_reschedule) = upstream_reschedule
1517 && upstream_reschedule
1518 .removed_actors
1519 .contains(upstream_actor_id)
1520 {
1521 continue;
1522 }
1523 for (_, downstream_actor_id) in
1524 reschedule
1525 .added_actors
1526 .iter()
1527 .flat_map(|(worker_id, actors)| {
1528 actors.iter().map(|actor| (*worker_id, *actor))
1529 })
1530 {
1531 actor_upstreams
1532 .entry(downstream_actor_id)
1533 .or_default()
1534 .entry(*upstream_fragment_id)
1535 .or_default()
1536 .insert(
1537 *upstream_actor_id,
1538 PbActorInfo {
1539 actor_id: *upstream_actor_id,
1540 host: Some(upstream_actor_host.clone()),
1541 },
1542 );
1543 }
1544 }
1545 }
1546 }
1547 }
1548 actor_upstreams
1549 }
1550}