1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
29use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_meta_model::WorkerId;
32use risingwave_pb::catalog::CreateType;
33use risingwave_pb::catalog::table::PbTableType;
34use risingwave_pb::common::PbActorInfo;
35use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
36use risingwave_pb::plan_common::PbField;
37use risingwave_pb::source::{
38 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
39};
40use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
41use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
42use risingwave_pb::stream_plan::barrier_mutation::Mutation;
43use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
44use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
45use risingwave_pb::stream_plan::stream_node::NodeBody;
46use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
47use risingwave_pb::stream_plan::update_mutation::*;
48use risingwave_pb::stream_plan::{
49 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
50 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
51 PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StopMutation,
52 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
53};
54use risingwave_pb::stream_service::BarrierCompleteResponse;
55use tracing::warn;
56
57use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
58use crate::MetaResult;
59use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
60use crate::barrier::cdc_progress::CdcTableBackfillTracker;
61use crate::barrier::edge_builder::FragmentEdgeBuildResult;
62use crate::barrier::info::BarrierInfo;
63use crate::barrier::rpc::ControlStreamManager;
64use crate::barrier::utils::collect_resp_info;
65use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
71 StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
74use crate::stream::{
75 AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
76 SplitState, ThrottleConfig, UpstreamSinkInfo, build_actor_connector_splits,
77};
78
79#[derive(Debug, Clone)]
82pub struct Reschedule {
83 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
85
86 pub removed_actors: HashSet<ActorId>,
88
89 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
91
92 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
94 pub upstream_dispatcher_mapping: Option<ActorMapping>,
99
100 pub downstream_fragment_ids: Vec<FragmentId>,
102
103 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
107
108 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
109}
110
111#[derive(Debug, Clone)]
118pub struct ReplaceStreamJobPlan {
119 pub old_fragments: StreamJobFragments,
120 pub new_fragments: StreamJobFragmentsToCreate,
121 pub replace_upstream: FragmentReplaceUpstream,
124 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
125 pub init_split_assignment: SplitAssignment,
131 pub streaming_job: StreamingJob,
133 pub tmp_id: JobId,
135 pub to_drop_state_table_ids: Vec<TableId>,
137 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
138}
139
140impl ReplaceStreamJobPlan {
141 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
142 let mut fragment_changes = HashMap::new();
143 for (fragment_id, new_fragment) in self
144 .new_fragments
145 .new_fragment_info(&self.init_split_assignment)
146 {
147 let fragment_change = CommandFragmentChanges::NewFragment {
148 job_id: self.streaming_job.id(),
149 info: new_fragment,
150 };
151 fragment_changes
152 .try_insert(fragment_id, fragment_change)
153 .expect("non-duplicate");
154 }
155 for fragment in self.old_fragments.fragments.values() {
156 fragment_changes
157 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
158 .expect("non-duplicate");
159 }
160 for (fragment_id, replace_map) in &self.replace_upstream {
161 fragment_changes
162 .try_insert(
163 *fragment_id,
164 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
165 )
166 .expect("non-duplicate");
167 }
168 if let Some(sinks) = &self.auto_refresh_schema_sinks {
169 for sink in sinks {
170 let fragment_change = CommandFragmentChanges::NewFragment {
171 job_id: sink.original_sink.id.as_job_id(),
172 info: sink.new_fragment_info(),
173 };
174 fragment_changes
175 .try_insert(sink.new_fragment.fragment_id, fragment_change)
176 .expect("non-duplicate");
177 fragment_changes
178 .try_insert(
179 sink.original_fragment.fragment_id,
180 CommandFragmentChanges::RemoveFragment,
181 )
182 .expect("non-duplicate");
183 }
184 }
185 fragment_changes
186 }
187
188 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
190 let mut fragment_replacements = HashMap::new();
191 for (upstream_fragment_id, new_upstream_fragment_id) in
192 self.replace_upstream.values().flatten()
193 {
194 {
195 let r =
196 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
197 if let Some(r) = r {
198 assert_eq!(
199 *new_upstream_fragment_id, r,
200 "one fragment is replaced by multiple fragments"
201 );
202 }
203 }
204 }
205 fragment_replacements
206 }
207}
208
209#[derive(educe::Educe, Clone)]
210#[educe(Debug)]
211pub struct CreateStreamingJobCommandInfo {
212 #[educe(Debug(ignore))]
213 pub stream_job_fragments: StreamJobFragmentsToCreate,
214 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
215 pub init_split_assignment: SplitAssignment,
216 pub definition: String,
217 pub job_type: StreamingJobType,
218 pub create_type: CreateType,
219 pub streaming_job: StreamingJob,
220 pub fragment_backfill_ordering: FragmentBackfillOrder,
221 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
222 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
223}
224
225impl StreamJobFragments {
226 pub(super) fn new_fragment_info<'a>(
227 &'a self,
228 assignment: &'a SplitAssignment,
229 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
230 self.fragments.values().map(|fragment| {
231 let mut fragment_splits = assignment
232 .get(&fragment.fragment_id)
233 .cloned()
234 .unwrap_or_default();
235
236 (
237 fragment.fragment_id,
238 InflightFragmentInfo {
239 fragment_id: fragment.fragment_id,
240 distribution_type: fragment.distribution_type.into(),
241 fragment_type_mask: fragment.fragment_type_mask,
242 vnode_count: fragment.vnode_count(),
243 nodes: fragment.nodes.clone(),
244 actors: fragment
245 .actors
246 .iter()
247 .map(|actor| {
248 (
249 actor.actor_id,
250 InflightActorInfo {
251 worker_id: self
252 .actor_status
253 .get(&actor.actor_id)
254 .expect("should exist")
255 .worker_id(),
256 vnode_bitmap: actor.vnode_bitmap.clone(),
257 splits: fragment_splits
258 .remove(&actor.actor_id)
259 .unwrap_or_default(),
260 },
261 )
262 })
263 .collect(),
264 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
265 },
266 )
267 })
268 }
269}
270
271#[derive(Debug, Clone)]
272pub struct SnapshotBackfillInfo {
273 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
277}
278
279#[derive(Debug, Clone)]
280pub enum CreateStreamingJobType {
281 Normal,
282 SinkIntoTable(UpstreamSinkInfo),
283 SnapshotBackfill(SnapshotBackfillInfo),
284}
285
286#[derive(Debug)]
291pub enum Command {
292 Flush,
295
296 Pause,
299
300 Resume,
304
305 DropStreamingJobs {
313 streaming_job_ids: HashSet<JobId>,
314 actors: Vec<ActorId>,
315 unregistered_state_table_ids: HashSet<TableId>,
316 unregistered_fragment_ids: HashSet<FragmentId>,
317 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
319 },
320
321 CreateStreamingJob {
331 info: CreateStreamingJobCommandInfo,
332 job_type: CreateStreamingJobType,
333 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
334 },
335
336 RescheduleFragment {
342 reschedules: HashMap<FragmentId, Reschedule>,
343 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
345 },
346
347 ReplaceStreamJob(ReplaceStreamJobPlan),
354
355 SourceChangeSplit(SplitState),
358
359 Throttle(ThrottleConfig),
362
363 CreateSubscription {
366 subscription_id: SubscriptionId,
367 upstream_mv_table_id: TableId,
368 retention_second: u64,
369 },
370
371 DropSubscription {
375 subscription_id: SubscriptionId,
376 upstream_mv_table_id: TableId,
377 },
378
379 ConnectorPropsChange(ConnectorPropsChange),
380
381 Refresh {
384 table_id: TableId,
385 associated_source_id: SourceId,
386 },
387 ListFinish {
388 table_id: TableId,
389 associated_source_id: SourceId,
390 },
391 LoadFinish {
392 table_id: TableId,
393 associated_source_id: SourceId,
394 },
395}
396
397impl std::fmt::Display for Command {
399 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
400 match self {
401 Command::Flush => write!(f, "Flush"),
402 Command::Pause => write!(f, "Pause"),
403 Command::Resume => write!(f, "Resume"),
404 Command::DropStreamingJobs {
405 streaming_job_ids, ..
406 } => {
407 write!(
408 f,
409 "DropStreamingJobs: {}",
410 streaming_job_ids.iter().sorted().join(", ")
411 )
412 }
413 Command::CreateStreamingJob { info, .. } => {
414 write!(f, "CreateStreamingJob: {}", info.streaming_job)
415 }
416 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
417 Command::ReplaceStreamJob(plan) => {
418 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
419 }
420 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
421 Command::Throttle(_) => write!(f, "Throttle"),
422 Command::CreateSubscription {
423 subscription_id, ..
424 } => write!(f, "CreateSubscription: {subscription_id}"),
425 Command::DropSubscription {
426 subscription_id, ..
427 } => write!(f, "DropSubscription: {subscription_id}"),
428 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
429 Command::Refresh {
430 table_id,
431 associated_source_id,
432 } => write!(
433 f,
434 "Refresh: {} (source: {})",
435 table_id, associated_source_id
436 ),
437 Command::ListFinish {
438 table_id,
439 associated_source_id,
440 } => write!(
441 f,
442 "ListFinish: {} (source: {})",
443 table_id, associated_source_id
444 ),
445 Command::LoadFinish {
446 table_id,
447 associated_source_id,
448 } => write!(
449 f,
450 "LoadFinish: {} (source: {})",
451 table_id, associated_source_id
452 ),
453 }
454 }
455}
456
457impl Command {
458 pub fn pause() -> Self {
459 Self::Pause
460 }
461
462 pub fn resume() -> Self {
463 Self::Resume
464 }
465
466 #[expect(clippy::type_complexity)]
467 pub(super) fn fragment_changes(
468 &self,
469 ) -> Option<(
470 Option<(JobId, Option<CdcTableBackfillTracker>)>,
471 HashMap<FragmentId, CommandFragmentChanges>,
472 )> {
473 match self {
474 Command::Flush => None,
475 Command::Pause => None,
476 Command::Resume => None,
477 Command::DropStreamingJobs {
478 unregistered_fragment_ids,
479 dropped_sink_fragment_by_targets,
480 ..
481 } => {
482 let changes = unregistered_fragment_ids
483 .iter()
484 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
485 .chain(dropped_sink_fragment_by_targets.iter().map(
486 |(target_fragment, sink_fragments)| {
487 (
488 *target_fragment,
489 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
490 )
491 },
492 ))
493 .collect();
494
495 Some((None, changes))
496 }
497 Command::CreateStreamingJob { info, job_type, .. } => {
498 assert!(
499 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
500 "should handle fragment changes separately for snapshot backfill"
501 );
502 let mut changes: HashMap<_, _> = info
503 .stream_job_fragments
504 .new_fragment_info(&info.init_split_assignment)
505 .map(|(fragment_id, fragment_infos)| {
506 (
507 fragment_id,
508 CommandFragmentChanges::NewFragment {
509 job_id: info.streaming_job.id(),
510 info: fragment_infos,
511 },
512 )
513 })
514 .collect();
515
516 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
517 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
518 changes.insert(
519 downstream_fragment_id,
520 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
521 upstream_fragment_id: ctx.sink_fragment_id,
522 sink_output_schema: ctx.sink_output_fields.clone(),
523 project_exprs: ctx.project_exprs.clone(),
524 }),
525 );
526 }
527
528 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
529 let (fragment, _) =
530 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
531 .expect("should have parallel cdc fragment");
532 Some(CdcTableBackfillTracker::new(
533 fragment.fragment_id,
534 splits.clone(),
535 ))
536 } else {
537 None
538 };
539
540 Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
541 }
542 Command::RescheduleFragment { reschedules, .. } => Some((
543 None,
544 reschedules
545 .iter()
546 .map(|(fragment_id, reschedule)| {
547 (
548 *fragment_id,
549 CommandFragmentChanges::Reschedule {
550 new_actors: reschedule
551 .added_actors
552 .iter()
553 .flat_map(|(node_id, actors)| {
554 actors.iter().map(|actor_id| {
555 (
556 *actor_id,
557 InflightActorInfo {
558 worker_id: *node_id,
559 vnode_bitmap: reschedule
560 .newly_created_actors
561 .get(actor_id)
562 .expect("should exist")
563 .0
564 .0
565 .vnode_bitmap
566 .clone(),
567 splits: reschedule
568 .actor_splits
569 .get(actor_id)
570 .cloned()
571 .unwrap_or_default(),
572 },
573 )
574 })
575 })
576 .collect(),
577 actor_update_vnode_bitmap: reschedule
578 .vnode_bitmap_updates
579 .iter()
580 .filter(|(actor_id, _)| {
581 !reschedule.newly_created_actors.contains_key(actor_id)
583 })
584 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
585 .collect(),
586 to_remove: reschedule.removed_actors.iter().cloned().collect(),
587 actor_splits: reschedule.actor_splits.clone(),
588 },
589 )
590 })
591 .collect(),
592 )),
593 Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
594 Command::SourceChangeSplit(SplitState {
595 split_assignment, ..
596 }) => Some((
597 None,
598 split_assignment
599 .iter()
600 .map(|(&fragment_id, splits)| {
601 (
602 fragment_id,
603 CommandFragmentChanges::SplitAssignment {
604 actor_splits: splits.clone(),
605 },
606 )
607 })
608 .collect(),
609 )),
610 Command::Throttle(_) => None,
611 Command::CreateSubscription { .. } => None,
612 Command::DropSubscription { .. } => None,
613 Command::ConnectorPropsChange(_) => None,
614 Command::Refresh { .. } => None, Command::ListFinish { .. } => None, Command::LoadFinish { .. } => None, }
618 }
619
620 pub fn need_checkpoint(&self) -> bool {
621 !matches!(self, Command::Resume)
623 }
624}
625
626#[derive(Debug, Clone)]
627pub enum BarrierKind {
628 Initial,
629 Barrier,
630 Checkpoint(Vec<u64>),
632}
633
634impl BarrierKind {
635 pub fn to_protobuf(&self) -> PbBarrierKind {
636 match self {
637 BarrierKind::Initial => PbBarrierKind::Initial,
638 BarrierKind::Barrier => PbBarrierKind::Barrier,
639 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
640 }
641 }
642
643 pub fn is_checkpoint(&self) -> bool {
644 matches!(self, BarrierKind::Checkpoint(_))
645 }
646
647 pub fn is_initial(&self) -> bool {
648 matches!(self, BarrierKind::Initial)
649 }
650
651 pub fn as_str_name(&self) -> &'static str {
652 match self {
653 BarrierKind::Initial => "Initial",
654 BarrierKind::Barrier => "Barrier",
655 BarrierKind::Checkpoint(_) => "Checkpoint",
656 }
657 }
658}
659
660pub(super) struct CommandContext {
663 mv_subscription_max_retention: HashMap<TableId, u64>,
664
665 pub(super) barrier_info: BarrierInfo,
666
667 pub(super) table_ids_to_commit: HashSet<TableId>,
668
669 pub(super) command: Option<Command>,
670
671 _span: tracing::Span,
677}
678
679impl std::fmt::Debug for CommandContext {
680 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
681 f.debug_struct("CommandContext")
682 .field("barrier_info", &self.barrier_info)
683 .field("command", &self.command)
684 .finish()
685 }
686}
687
688impl std::fmt::Display for CommandContext {
689 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
690 write!(
691 f,
692 "prev_epoch={}, curr_epoch={}, kind={}",
693 self.barrier_info.prev_epoch(),
694 self.barrier_info.curr_epoch(),
695 self.barrier_info.kind.as_str_name()
696 )?;
697 if let Some(command) = &self.command {
698 write!(f, ", command={}", command)?;
699 }
700 Ok(())
701 }
702}
703
704impl CommandContext {
705 pub(super) fn new(
706 barrier_info: BarrierInfo,
707 mv_subscription_max_retention: HashMap<TableId, u64>,
708 table_ids_to_commit: HashSet<TableId>,
709 command: Option<Command>,
710 span: tracing::Span,
711 ) -> Self {
712 Self {
713 mv_subscription_max_retention,
714 barrier_info,
715 table_ids_to_commit,
716 command,
717 _span: span,
718 }
719 }
720
721 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
722 let Some(truncate_timestamptz) = Timestamptz::from_secs(
723 self.barrier_info
724 .prev_epoch
725 .value()
726 .as_timestamptz()
727 .timestamp()
728 - retention_second as i64,
729 ) else {
730 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
731 return self.barrier_info.prev_epoch.value();
732 };
733 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
734 }
735
736 pub(super) fn collect_commit_epoch_info(
737 &self,
738 info: &mut CommitEpochInfo,
739 resps: Vec<BarrierCompleteResponse>,
740 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
741 ) {
742 let (
743 sst_to_context,
744 synced_ssts,
745 new_table_watermarks,
746 old_value_ssts,
747 vector_index_adds,
748 truncate_tables,
749 ) = collect_resp_info(resps);
750
751 let new_table_fragment_infos =
752 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
753 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
754 {
755 let table_fragments = &info.stream_job_fragments;
756 let mut table_ids: HashSet<_> =
757 table_fragments.internal_table_ids().into_iter().collect();
758 if let Some(mv_table_id) = table_fragments.mv_table_id() {
759 table_ids.insert(mv_table_id);
760 }
761
762 vec![NewTableFragmentInfo { table_ids }]
763 } else {
764 vec![]
765 };
766
767 let mut mv_log_store_truncate_epoch = HashMap::new();
768 let mut update_truncate_epoch =
770 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
771 Entry::Occupied(mut entry) => {
772 let prev_truncate_epoch = entry.get_mut();
773 if truncate_epoch < *prev_truncate_epoch {
774 *prev_truncate_epoch = truncate_epoch;
775 }
776 }
777 Entry::Vacant(entry) => {
778 entry.insert(truncate_epoch);
779 }
780 };
781 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
782 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
783 update_truncate_epoch(*mv_table_id, truncate_epoch);
784 }
785 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
786 for mv_table_id in upstream_mv_table_ids {
787 update_truncate_epoch(mv_table_id, backfill_epoch);
788 }
789 }
790
791 let table_new_change_log = build_table_change_log_delta(
792 old_value_ssts.into_iter(),
793 synced_ssts.iter().map(|sst| &sst.sst_info),
794 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
795 mv_log_store_truncate_epoch.into_iter(),
796 );
797
798 let epoch = self.barrier_info.prev_epoch();
799 for table_id in &self.table_ids_to_commit {
800 info.tables_to_commit
801 .try_insert(*table_id, epoch)
802 .expect("non duplicate");
803 }
804
805 info.sstables.extend(synced_ssts);
806 info.new_table_watermarks.extend(new_table_watermarks);
807 info.sst_to_context.extend(sst_to_context);
808 info.new_table_fragment_infos
809 .extend(new_table_fragment_infos);
810 info.change_log_delta.extend(table_new_change_log);
811 for (table_id, vector_index_adds) in vector_index_adds {
812 info.vector_index_delta
813 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
814 .expect("non-duplicate");
815 }
816 if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
817 for fragment in job_info.stream_job_fragments.fragments.values() {
818 visit_stream_node_cont(&fragment.nodes, |node| {
819 match node.node_body.as_ref().unwrap() {
820 NodeBody::VectorIndexWrite(vector_index_write) => {
821 let index_table = vector_index_write.table.as_ref().unwrap();
822 assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
823 info.vector_index_delta
824 .try_insert(
825 index_table.id,
826 VectorIndexDelta::Init(PbVectorIndexInit {
827 info: Some(index_table.vector_index_info.unwrap()),
828 }),
829 )
830 .expect("non-duplicate");
831 false
832 }
833 _ => true,
834 }
835 })
836 }
837 }
838 info.truncate_tables.extend(truncate_tables);
839 }
840}
841
842impl Command {
843 pub(super) fn to_mutation(
847 &self,
848 is_currently_paused: bool,
849 edges: &mut Option<FragmentEdgeBuildResult>,
850 control_stream_manager: &ControlStreamManager,
851 database_info: &mut InflightDatabaseInfo,
852 ) -> MetaResult<Option<Mutation>> {
853 let mutation = match self {
854 Command::Flush => None,
855
856 Command::Pause => {
857 if !is_currently_paused {
860 Some(Mutation::Pause(PauseMutation {}))
861 } else {
862 None
863 }
864 }
865
866 Command::Resume => {
867 if is_currently_paused {
869 Some(Mutation::Resume(ResumeMutation {}))
870 } else {
871 None
872 }
873 }
874
875 Command::SourceChangeSplit(SplitState {
876 split_assignment, ..
877 }) => {
878 let mut diff = HashMap::new();
879
880 for actor_splits in split_assignment.values() {
881 diff.extend(actor_splits.clone());
882 }
883
884 Some(Mutation::Splits(SourceChangeSplitMutation {
885 actor_splits: build_actor_connector_splits(&diff),
886 }))
887 }
888
889 Command::Throttle(config) => {
890 let mut actor_to_apply = HashMap::new();
891 for per_fragment in config.values() {
892 actor_to_apply.extend(
893 per_fragment
894 .iter()
895 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
896 );
897 }
898
899 Some(Mutation::Throttle(ThrottleMutation {
900 actor_throttle: actor_to_apply,
901 }))
902 }
903
904 Command::DropStreamingJobs {
905 actors,
906 dropped_sink_fragment_by_targets,
907 ..
908 } => Some(Mutation::Stop(StopMutation {
909 actors: actors.clone(),
910 dropped_sink_fragments: dropped_sink_fragment_by_targets
911 .values()
912 .flatten()
913 .cloned()
914 .collect(),
915 })),
916
917 Command::CreateStreamingJob {
918 info:
919 CreateStreamingJobCommandInfo {
920 stream_job_fragments,
921 init_split_assignment: split_assignment,
922 upstream_fragment_downstreams,
923 fragment_backfill_ordering,
924 ..
925 },
926 job_type,
927 ..
928 } => {
929 let edges = edges.as_mut().expect("should exist");
930 let added_actors = stream_job_fragments.actor_ids().collect();
931 let actor_splits = split_assignment
932 .values()
933 .flat_map(build_actor_connector_splits)
934 .collect();
935 let subscriptions_to_add =
936 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
937 job_type
938 {
939 snapshot_backfill_info
940 .upstream_mv_table_id_to_backfill_epoch
941 .keys()
942 .map(|table_id| SubscriptionUpstreamInfo {
943 subscriber_id: stream_job_fragments
944 .stream_job_id()
945 .as_subscriber_id(),
946 upstream_mv_table_id: *table_id,
947 })
948 .collect()
949 } else {
950 Default::default()
951 };
952 let backfill_nodes_to_pause: Vec<_> =
953 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
954 .into_iter()
955 .collect();
956
957 let new_upstream_sinks =
958 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
959 sink_fragment_id,
960 sink_output_fields,
961 project_exprs,
962 new_sink_downstream,
963 ..
964 }) = job_type
965 {
966 let new_sink_actors = stream_job_fragments
967 .actors_to_create()
968 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
969 .exactly_one()
970 .map(|(_, _, actors)| {
971 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
972 actor_id: actor.actor_id,
973 host: Some(control_stream_manager.host_addr(worker_id)),
974 })
975 })
976 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
977 let new_upstream_sink = PbNewUpstreamSink {
978 info: Some(PbUpstreamSinkInfo {
979 upstream_fragment_id: *sink_fragment_id,
980 sink_output_schema: sink_output_fields.clone(),
981 project_exprs: project_exprs.clone(),
982 }),
983 upstream_actors: new_sink_actors.collect(),
984 };
985 HashMap::from([(
986 new_sink_downstream.downstream_fragment_id,
987 new_upstream_sink,
988 )])
989 } else {
990 HashMap::new()
991 };
992
993 let actor_cdc_table_snapshot_splits =
994 if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
995 database_info
996 .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
997 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
998 } else {
999 None
1000 };
1001
1002 let add_mutation = AddMutation {
1003 actor_dispatchers: edges
1004 .dispatchers
1005 .extract_if(|fragment_id, _| {
1006 upstream_fragment_downstreams.contains_key(fragment_id)
1007 })
1008 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1009 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1010 .collect(),
1011 added_actors,
1012 actor_splits,
1013 pause: is_currently_paused,
1015 subscriptions_to_add,
1016 backfill_nodes_to_pause,
1017 actor_cdc_table_snapshot_splits,
1018 new_upstream_sinks,
1019 };
1020
1021 Some(Mutation::Add(add_mutation))
1022 }
1023
1024 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1025 old_fragments,
1026 replace_upstream,
1027 upstream_fragment_downstreams,
1028 init_split_assignment,
1029 auto_refresh_schema_sinks,
1030 ..
1031 }) => {
1032 let edges = edges.as_mut().expect("should exist");
1033 let merge_updates = edges
1034 .merge_updates
1035 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1036 .collect();
1037 let dispatchers = edges
1038 .dispatchers
1039 .extract_if(|fragment_id, _| {
1040 upstream_fragment_downstreams.contains_key(fragment_id)
1041 })
1042 .collect();
1043 let actor_cdc_table_snapshot_splits = database_info
1044 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1045 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1046 Self::generate_update_mutation_for_replace_table(
1047 old_fragments.actor_ids().chain(
1048 auto_refresh_schema_sinks
1049 .as_ref()
1050 .into_iter()
1051 .flat_map(|sinks| {
1052 sinks.iter().flat_map(|sink| {
1053 sink.original_fragment
1054 .actors
1055 .iter()
1056 .map(|actor| actor.actor_id)
1057 })
1058 }),
1059 ),
1060 merge_updates,
1061 dispatchers,
1062 init_split_assignment,
1063 actor_cdc_table_snapshot_splits,
1064 auto_refresh_schema_sinks.as_ref(),
1065 )
1066 }
1067
1068 Command::RescheduleFragment {
1069 reschedules,
1070 fragment_actors,
1071 ..
1072 } => {
1073 let mut dispatcher_update = HashMap::new();
1074 for reschedule in reschedules.values() {
1075 for &(upstream_fragment_id, dispatcher_id) in
1076 &reschedule.upstream_fragment_dispatcher_ids
1077 {
1078 let upstream_actor_ids = fragment_actors
1080 .get(&upstream_fragment_id)
1081 .expect("should contain");
1082
1083 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1084
1085 for &actor_id in upstream_actor_ids {
1087 let added_downstream_actor_id = if upstream_reschedule
1088 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1089 .unwrap_or(true)
1090 {
1091 reschedule
1092 .added_actors
1093 .values()
1094 .flatten()
1095 .cloned()
1096 .collect()
1097 } else {
1098 Default::default()
1099 };
1100 dispatcher_update
1102 .try_insert(
1103 (actor_id, dispatcher_id),
1104 DispatcherUpdate {
1105 actor_id,
1106 dispatcher_id,
1107 hash_mapping: reschedule
1108 .upstream_dispatcher_mapping
1109 .as_ref()
1110 .map(|m| m.to_protobuf()),
1111 added_downstream_actor_id,
1112 removed_downstream_actor_id: reschedule
1113 .removed_actors
1114 .iter()
1115 .cloned()
1116 .collect(),
1117 },
1118 )
1119 .unwrap();
1120 }
1121 }
1122 }
1123 let dispatcher_update = dispatcher_update.into_values().collect();
1124
1125 let mut merge_update = HashMap::new();
1126 for (&fragment_id, reschedule) in reschedules {
1127 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1128 let downstream_actor_ids = fragment_actors
1130 .get(&downstream_fragment_id)
1131 .expect("should contain");
1132
1133 let downstream_removed_actors: HashSet<_> = reschedules
1137 .get(&downstream_fragment_id)
1138 .map(|downstream_reschedule| {
1139 downstream_reschedule
1140 .removed_actors
1141 .iter()
1142 .copied()
1143 .collect()
1144 })
1145 .unwrap_or_default();
1146
1147 for &actor_id in downstream_actor_ids {
1149 if downstream_removed_actors.contains(&actor_id) {
1150 continue;
1151 }
1152
1153 merge_update
1155 .try_insert(
1156 (actor_id, fragment_id),
1157 MergeUpdate {
1158 actor_id,
1159 upstream_fragment_id: fragment_id,
1160 new_upstream_fragment_id: None,
1161 added_upstream_actors: reschedule
1162 .added_actors
1163 .iter()
1164 .flat_map(|(worker_id, actors)| {
1165 let host =
1166 control_stream_manager.host_addr(*worker_id);
1167 actors.iter().map(move |&actor_id| PbActorInfo {
1168 actor_id,
1169 host: Some(host.clone()),
1170 })
1171 })
1172 .collect(),
1173 removed_upstream_actor_id: reschedule
1174 .removed_actors
1175 .iter()
1176 .cloned()
1177 .collect(),
1178 },
1179 )
1180 .unwrap();
1181 }
1182 }
1183 }
1184 let merge_update = merge_update.into_values().collect();
1185
1186 let mut actor_vnode_bitmap_update = HashMap::new();
1187 for reschedule in reschedules.values() {
1188 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1190 let bitmap = bitmap.to_protobuf();
1191 actor_vnode_bitmap_update
1192 .try_insert(actor_id, bitmap)
1193 .unwrap();
1194 }
1195 }
1196 let dropped_actors = reschedules
1197 .values()
1198 .flat_map(|r| r.removed_actors.iter().copied())
1199 .collect();
1200 let mut actor_splits = HashMap::new();
1201 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1202 for (fragment_id, reschedule) in reschedules {
1203 for (actor_id, splits) in &reschedule.actor_splits {
1204 actor_splits.insert(
1205 *actor_id,
1206 ConnectorSplits {
1207 splits: splits.iter().map(ConnectorSplit::from).collect(),
1208 },
1209 );
1210 }
1211
1212 if let Some(assignment) =
1213 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1214 {
1215 actor_cdc_table_snapshot_splits.extend(assignment)
1216 }
1217 }
1218
1219 let actor_new_dispatchers = HashMap::new();
1221 let mutation = Mutation::Update(UpdateMutation {
1222 dispatcher_update,
1223 merge_update,
1224 actor_vnode_bitmap_update,
1225 dropped_actors,
1226 actor_splits,
1227 actor_new_dispatchers,
1228 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1229 splits: actor_cdc_table_snapshot_splits,
1230 }),
1231 sink_schema_change: Default::default(),
1232 subscriptions_to_drop: vec![],
1233 });
1234 tracing::debug!("update mutation: {mutation:?}");
1235 Some(mutation)
1236 }
1237
1238 Command::CreateSubscription {
1239 upstream_mv_table_id,
1240 subscription_id,
1241 ..
1242 } => Some(Mutation::Add(AddMutation {
1243 actor_dispatchers: Default::default(),
1244 added_actors: vec![],
1245 actor_splits: Default::default(),
1246 pause: false,
1247 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1248 upstream_mv_table_id: *upstream_mv_table_id,
1249 subscriber_id: subscription_id.as_subscriber_id(),
1250 }],
1251 backfill_nodes_to_pause: vec![],
1252 actor_cdc_table_snapshot_splits: None,
1253 new_upstream_sinks: Default::default(),
1254 })),
1255 Command::DropSubscription {
1256 upstream_mv_table_id,
1257 subscription_id,
1258 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1259 info: vec![SubscriptionUpstreamInfo {
1260 subscriber_id: subscription_id.as_subscriber_id(),
1261 upstream_mv_table_id: *upstream_mv_table_id,
1262 }],
1263 })),
1264 Command::ConnectorPropsChange(config) => {
1265 let mut connector_props_infos = HashMap::default();
1266 for (k, v) in config {
1267 connector_props_infos.insert(
1268 k.as_raw_id(),
1269 ConnectorPropsInfo {
1270 connector_props_info: v.clone(),
1271 },
1272 );
1273 }
1274 Some(Mutation::ConnectorPropsChange(
1275 ConnectorPropsChangeMutation {
1276 connector_props_infos,
1277 },
1278 ))
1279 }
1280 Command::Refresh {
1281 table_id,
1282 associated_source_id,
1283 } => Some(Mutation::RefreshStart(
1284 risingwave_pb::stream_plan::RefreshStartMutation {
1285 table_id: *table_id,
1286 associated_source_id: *associated_source_id,
1287 },
1288 )),
1289 Command::ListFinish {
1290 table_id: _,
1291 associated_source_id,
1292 } => Some(Mutation::ListFinish(ListFinishMutation {
1293 associated_source_id: *associated_source_id,
1294 })),
1295 Command::LoadFinish {
1296 table_id: _,
1297 associated_source_id,
1298 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1299 associated_source_id: *associated_source_id,
1300 })),
1301 };
1302 Ok(mutation)
1303 }
1304
1305 pub(super) fn actors_to_create(
1306 &self,
1307 database_info: &InflightDatabaseInfo,
1308 edges: &mut Option<FragmentEdgeBuildResult>,
1309 control_stream_manager: &ControlStreamManager,
1310 ) -> Option<StreamJobActorsToCreate> {
1311 match self {
1312 Command::CreateStreamingJob { info, job_type, .. } => {
1313 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1314 return None;
1316 }
1317 let actors_to_create = info.stream_job_fragments.actors_to_create();
1318 let edges = edges.as_mut().expect("should exist");
1319 Some(edges.collect_actors_to_create(actors_to_create.map(
1320 |(fragment_id, node, actors)| {
1321 (
1322 fragment_id,
1323 node,
1324 actors,
1325 [], )
1327 },
1328 )))
1329 }
1330 Command::RescheduleFragment {
1331 reschedules,
1332 fragment_actors,
1333 ..
1334 } => {
1335 let mut actor_upstreams = Self::collect_actor_upstreams(
1336 reschedules.iter().map(|(fragment_id, reschedule)| {
1337 (
1338 *fragment_id,
1339 reschedule.newly_created_actors.values().map(
1340 |((actor, dispatchers), _)| {
1341 (actor.actor_id, dispatchers.as_slice())
1342 },
1343 ),
1344 )
1345 }),
1346 Some((reschedules, fragment_actors)),
1347 database_info,
1348 control_stream_manager,
1349 );
1350 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1351 for (fragment_id, (actor, dispatchers), worker_id) in
1352 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1353 reschedule
1354 .newly_created_actors
1355 .values()
1356 .map(|(actors, status)| (*fragment_id, actors, status))
1357 })
1358 {
1359 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1360 map.entry(*worker_id)
1361 .or_default()
1362 .entry(fragment_id)
1363 .or_insert_with(|| {
1364 let node = database_info.fragment(fragment_id).nodes.clone();
1365 let subscribers =
1366 database_info.fragment_subscribers(fragment_id).collect();
1367 (node, vec![], subscribers)
1368 })
1369 .1
1370 .push((actor.clone(), upstreams, dispatchers.clone()));
1371 }
1372 Some(map)
1373 }
1374 Command::ReplaceStreamJob(replace_table) => {
1375 let edges = edges.as_mut().expect("should exist");
1376 let mut actors = edges.collect_actors_to_create(
1377 replace_table.new_fragments.actors_to_create().map(
1378 |(fragment_id, node, actors)| {
1379 (
1380 fragment_id,
1381 node,
1382 actors,
1383 database_info
1384 .job_subscribers(replace_table.old_fragments.stream_job_id),
1385 )
1386 },
1387 ),
1388 );
1389 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1390 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1391 (
1392 sink.new_fragment.fragment_id,
1393 &sink.new_fragment.nodes,
1394 sink.new_fragment.actors.iter().map(|actor| {
1395 (
1396 actor,
1397 sink.actor_status[&actor.actor_id]
1398 .location
1399 .as_ref()
1400 .unwrap()
1401 .worker_node_id,
1402 )
1403 }),
1404 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1405 )
1406 }));
1407 for (worker_id, fragment_actors) in sink_actors {
1408 actors.entry(worker_id).or_default().extend(fragment_actors);
1409 }
1410 }
1411 Some(actors)
1412 }
1413 _ => None,
1414 }
1415 }
1416
1417 fn generate_update_mutation_for_replace_table(
1418 dropped_actors: impl IntoIterator<Item = ActorId>,
1419 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1420 dispatchers: FragmentActorDispatchers,
1421 init_split_assignment: &SplitAssignment,
1422 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1423 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1424 ) -> Option<Mutation> {
1425 let dropped_actors = dropped_actors.into_iter().collect();
1426
1427 let actor_new_dispatchers = dispatchers
1428 .into_values()
1429 .flatten()
1430 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1431 .collect();
1432
1433 let actor_splits = init_split_assignment
1434 .values()
1435 .flat_map(build_actor_connector_splits)
1436 .collect();
1437 Some(Mutation::Update(UpdateMutation {
1438 actor_new_dispatchers,
1439 merge_update: merge_updates.into_values().flatten().collect(),
1440 dropped_actors,
1441 actor_splits,
1442 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1443 sink_schema_change: auto_refresh_schema_sinks
1444 .as_ref()
1445 .into_iter()
1446 .flat_map(|sinks| {
1447 sinks.iter().map(|sink| {
1448 (
1449 sink.original_sink.id.as_raw_id(),
1450 PbSinkSchemaChange {
1451 original_schema: sink
1452 .original_sink
1453 .columns
1454 .iter()
1455 .map(|col| PbField {
1456 data_type: Some(
1457 col.column_desc
1458 .as_ref()
1459 .unwrap()
1460 .column_type
1461 .as_ref()
1462 .unwrap()
1463 .clone(),
1464 ),
1465 name: col.column_desc.as_ref().unwrap().name.clone(),
1466 })
1467 .collect(),
1468 op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1469 fields: sink
1470 .newly_add_fields
1471 .iter()
1472 .map(|field| field.to_prost())
1473 .collect(),
1474 })),
1475 },
1476 )
1477 })
1478 })
1479 .collect(),
1480 ..Default::default()
1481 }))
1482 }
1483
1484 pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_ {
1486 match self {
1487 Command::DropStreamingJobs {
1488 streaming_job_ids, ..
1489 } => Some(streaming_job_ids.iter().cloned()),
1490 _ => None,
1491 }
1492 .into_iter()
1493 .flatten()
1494 }
1495}
1496
1497impl Command {
1498 #[expect(clippy::type_complexity)]
1499 pub(super) fn collect_actor_upstreams(
1500 actor_dispatchers: impl Iterator<
1501 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1502 >,
1503 reschedule_dispatcher_update: Option<(
1504 &HashMap<FragmentId, Reschedule>,
1505 &HashMap<FragmentId, HashSet<ActorId>>,
1506 )>,
1507 database_info: &InflightDatabaseInfo,
1508 control_stream_manager: &ControlStreamManager,
1509 ) -> HashMap<ActorId, ActorUpstreams> {
1510 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1511 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1512 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1513 for (upstream_actor_id, dispatchers) in upstream_actors {
1514 let upstream_actor_location =
1515 upstream_fragment.actors[&upstream_actor_id].worker_id;
1516 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1517 for downstream_actor_id in dispatchers
1518 .iter()
1519 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1520 {
1521 actor_upstreams
1522 .entry(*downstream_actor_id)
1523 .or_default()
1524 .entry(upstream_fragment_id)
1525 .or_default()
1526 .insert(
1527 upstream_actor_id,
1528 PbActorInfo {
1529 actor_id: upstream_actor_id,
1530 host: Some(upstream_actor_host.clone()),
1531 },
1532 );
1533 }
1534 }
1535 }
1536 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1537 for reschedule in reschedules.values() {
1538 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1539 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1540 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1541 for upstream_actor_id in fragment_actors
1542 .get(upstream_fragment_id)
1543 .expect("should exist")
1544 {
1545 let upstream_actor_location =
1546 upstream_fragment.actors[upstream_actor_id].worker_id;
1547 let upstream_actor_host =
1548 control_stream_manager.host_addr(upstream_actor_location);
1549 if let Some(upstream_reschedule) = upstream_reschedule
1550 && upstream_reschedule
1551 .removed_actors
1552 .contains(upstream_actor_id)
1553 {
1554 continue;
1555 }
1556 for (_, downstream_actor_id) in
1557 reschedule
1558 .added_actors
1559 .iter()
1560 .flat_map(|(worker_id, actors)| {
1561 actors.iter().map(|actor| (*worker_id, *actor))
1562 })
1563 {
1564 actor_upstreams
1565 .entry(downstream_actor_id)
1566 .or_default()
1567 .entry(*upstream_fragment_id)
1568 .or_default()
1569 .insert(
1570 *upstream_actor_id,
1571 PbActorInfo {
1572 actor_id: *upstream_actor_id,
1573 host: Some(upstream_actor_host.clone()),
1574 },
1575 );
1576 }
1577 }
1578 }
1579 }
1580 }
1581 actor_upstreams
1582 }
1583}