1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18use std::iter;
19
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::TableId;
23use risingwave_common::hash::ActorMapping;
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::SplitImpl;
29use risingwave_connector::source::cdc::{
30 CdcTableSnapshotSplitAssignment, CdcTableSnapshotSplitAssignmentWithGeneration,
31 build_pb_actor_cdc_table_snapshot_splits,
32 build_pb_actor_cdc_table_snapshot_splits_with_generation,
33};
34use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
35use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::catalog::CreateType;
38use risingwave_pb::catalog::table::PbTableType;
39use risingwave_pb::common::PbActorInfo;
40use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
41use risingwave_pb::source::{
42 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
43};
44use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
45use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
46use risingwave_pb::stream_plan::barrier_mutation::Mutation;
47use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
48use risingwave_pb::stream_plan::stream_node::NodeBody;
49use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
50use risingwave_pb::stream_plan::update_mutation::*;
51use risingwave_pb::stream_plan::{
52 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
53 LoadFinishMutation, PauseMutation, PbSinkAddColumns, PbUpstreamSinkInfo, ResumeMutation,
54 SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
55 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
56};
57use risingwave_pb::stream_service::BarrierCompleteResponse;
58use tracing::warn;
59
60use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
61use crate::barrier::InflightSubscriptionInfo;
62use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
63use crate::barrier::edge_builder::FragmentEdgeBuildResult;
64use crate::barrier::info::BarrierInfo;
65use crate::barrier::rpc::ControlStreamManager;
66use crate::barrier::utils::collect_resp_info;
67use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
68use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
69use crate::manager::{StreamingJob, StreamingJobType};
70use crate::model::{
71 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
72 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
73 StreamJobFragments, StreamJobFragmentsToCreate,
74};
75use crate::stream::{
76 AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder,
77 JobReschedulePostUpdates, SplitAssignment, SplitState, ThrottleConfig, UpstreamSinkInfo,
78 build_actor_connector_splits,
79};
80
81#[derive(Debug, Clone)]
84pub struct Reschedule {
85 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
87
88 pub removed_actors: HashSet<ActorId>,
90
91 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
93
94 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
96 pub upstream_dispatcher_mapping: Option<ActorMapping>,
101
102 pub downstream_fragment_ids: Vec<FragmentId>,
104
105 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
109
110 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
111
112 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
113
114 pub cdc_table_id: Option<u32>,
115}
116
117#[derive(Debug, Clone)]
124pub struct ReplaceStreamJobPlan {
125 pub old_fragments: StreamJobFragments,
126 pub new_fragments: StreamJobFragmentsToCreate,
127 pub replace_upstream: FragmentReplaceUpstream,
130 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
131 pub init_split_assignment: SplitAssignment,
137 pub streaming_job: StreamingJob,
139 pub tmp_id: u32,
141 pub to_drop_state_table_ids: Vec<TableId>,
143 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
144 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
145}
146
147impl ReplaceStreamJobPlan {
148 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
149 let mut fragment_changes = HashMap::new();
150 for (fragment_id, new_fragment) in self
151 .new_fragments
152 .new_fragment_info(&self.init_split_assignment)
153 {
154 let fragment_change = CommandFragmentChanges::NewFragment {
155 job_id: self.streaming_job.id().into(),
156 info: new_fragment,
157 is_existing: false,
158 };
159 fragment_changes
160 .try_insert(fragment_id, fragment_change)
161 .expect("non-duplicate");
162 }
163 for fragment in self.old_fragments.fragments.values() {
164 fragment_changes
165 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
166 .expect("non-duplicate");
167 }
168 for (fragment_id, replace_map) in &self.replace_upstream {
169 fragment_changes
170 .try_insert(
171 *fragment_id,
172 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
173 )
174 .expect("non-duplicate");
175 }
176 if let Some(sinks) = &self.auto_refresh_schema_sinks {
177 for sink in sinks {
178 let fragment_change = CommandFragmentChanges::NewFragment {
179 job_id: TableId::new(sink.original_sink.id as _),
180 info: sink.new_fragment_info(),
181 is_existing: false,
182 };
183 fragment_changes
184 .try_insert(sink.new_fragment.fragment_id, fragment_change)
185 .expect("non-duplicate");
186 fragment_changes
187 .try_insert(
188 sink.original_fragment.fragment_id,
189 CommandFragmentChanges::RemoveFragment,
190 )
191 .expect("non-duplicate");
192 }
193 }
194 fragment_changes
195 }
196
197 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
199 let mut fragment_replacements = HashMap::new();
200 for (upstream_fragment_id, new_upstream_fragment_id) in
201 self.replace_upstream.values().flatten()
202 {
203 {
204 let r =
205 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
206 if let Some(r) = r {
207 assert_eq!(
208 *new_upstream_fragment_id, r,
209 "one fragment is replaced by multiple fragments"
210 );
211 }
212 }
213 }
214 fragment_replacements
215 }
216}
217
218#[derive(educe::Educe, Clone)]
219#[educe(Debug)]
220pub struct CreateStreamingJobCommandInfo {
221 #[educe(Debug(ignore))]
222 pub stream_job_fragments: StreamJobFragmentsToCreate,
223 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
224 pub init_split_assignment: SplitAssignment,
225 pub definition: String,
226 pub job_type: StreamingJobType,
227 pub create_type: CreateType,
228 pub streaming_job: StreamingJob,
229 pub fragment_backfill_ordering: FragmentBackfillOrder,
230 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
231}
232
233impl StreamJobFragments {
234 pub(super) fn new_fragment_info<'a>(
235 &'a self,
236 assignment: &'a SplitAssignment,
237 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
238 self.fragments.values().map(|fragment| {
239 let mut fragment_splits = assignment
240 .get(&fragment.fragment_id)
241 .cloned()
242 .unwrap_or_default();
243
244 (
245 fragment.fragment_id,
246 InflightFragmentInfo {
247 fragment_id: fragment.fragment_id,
248 distribution_type: fragment.distribution_type.into(),
249 nodes: fragment.nodes.clone(),
250 actors: fragment
251 .actors
252 .iter()
253 .map(|actor| {
254 (
255 actor.actor_id,
256 InflightActorInfo {
257 worker_id: self
258 .actor_status
259 .get(&actor.actor_id)
260 .expect("should exist")
261 .worker_id()
262 as WorkerId,
263 vnode_bitmap: actor.vnode_bitmap.clone(),
264 splits: fragment_splits
265 .remove(&actor.actor_id)
266 .unwrap_or_default(),
267 },
268 )
269 })
270 .collect(),
271 state_table_ids: fragment
272 .state_table_ids
273 .iter()
274 .map(|table_id| TableId::new(*table_id))
275 .collect(),
276 },
277 )
278 })
279 }
280}
281
282#[derive(Debug, Clone)]
283pub struct SnapshotBackfillInfo {
284 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
288}
289
290#[derive(Debug, Clone)]
291pub enum CreateStreamingJobType {
292 Normal,
293 SinkIntoTable(UpstreamSinkInfo),
294 SnapshotBackfill(SnapshotBackfillInfo),
295}
296
297#[derive(Debug)]
302pub enum Command {
303 Flush,
306
307 Pause,
310
311 Resume,
315
316 DropStreamingJobs {
324 streaming_job_ids: HashSet<TableId>,
325 actors: Vec<ActorId>,
326 unregistered_state_table_ids: HashSet<TableId>,
327 unregistered_fragment_ids: HashSet<FragmentId>,
328 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
330 },
331
332 CreateStreamingJob {
342 info: CreateStreamingJobCommandInfo,
343 job_type: CreateStreamingJobType,
344 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
345 },
346 MergeSnapshotBackfillStreamingJobs(
347 HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
348 ),
349
350 RescheduleFragment {
356 reschedules: HashMap<FragmentId, Reschedule>,
357 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
359 post_updates: JobReschedulePostUpdates,
361 },
362
363 ReplaceStreamJob(ReplaceStreamJobPlan),
370
371 SourceChangeSplit(SplitState),
374
375 Throttle(ThrottleConfig),
378
379 CreateSubscription {
382 subscription_id: u32,
383 upstream_mv_table_id: TableId,
384 retention_second: u64,
385 },
386
387 DropSubscription {
391 subscription_id: u32,
392 upstream_mv_table_id: TableId,
393 },
394
395 ConnectorPropsChange(ConnectorPropsChange),
396
397 StartFragmentBackfill {
399 fragment_ids: Vec<FragmentId>,
400 },
401
402 Refresh {
405 table_id: TableId,
406 associated_source_id: TableId,
407 },
408 LoadFinish {
409 table_id: TableId,
410 associated_source_id: TableId,
411 },
412}
413
414impl std::fmt::Display for Command {
416 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
417 match self {
418 Command::Flush => write!(f, "Flush"),
419 Command::Pause => write!(f, "Pause"),
420 Command::Resume => write!(f, "Resume"),
421 Command::DropStreamingJobs {
422 streaming_job_ids, ..
423 } => {
424 write!(
425 f,
426 "DropStreamingJobs: {}",
427 streaming_job_ids.iter().sorted().join(", ")
428 )
429 }
430 Command::CreateStreamingJob { info, .. } => {
431 write!(f, "CreateStreamingJob: {}", info.streaming_job)
432 }
433 Command::MergeSnapshotBackfillStreamingJobs(_) => {
434 write!(f, "MergeSnapshotBackfillStreamingJobs")
435 }
436 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
437 Command::ReplaceStreamJob(plan) => {
438 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
439 }
440 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
441 Command::Throttle(_) => write!(f, "Throttle"),
442 Command::CreateSubscription {
443 subscription_id, ..
444 } => write!(f, "CreateSubscription: {subscription_id}"),
445 Command::DropSubscription {
446 subscription_id, ..
447 } => write!(f, "DropSubscription: {subscription_id}"),
448 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
449 Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
450 Command::Refresh {
451 table_id,
452 associated_source_id,
453 } => write!(
454 f,
455 "Refresh: {} (source: {})",
456 table_id, associated_source_id
457 ),
458 Command::LoadFinish {
459 table_id,
460 associated_source_id,
461 } => write!(
462 f,
463 "LoadFinish: {} (source: {})",
464 table_id, associated_source_id
465 ),
466 }
467 }
468}
469
470impl Command {
471 pub fn pause() -> Self {
472 Self::Pause
473 }
474
475 pub fn resume() -> Self {
476 Self::Resume
477 }
478
479 pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
480 match self {
481 Command::Flush => None,
482 Command::Pause => None,
483 Command::Resume => None,
484 Command::DropStreamingJobs {
485 unregistered_fragment_ids,
486 dropped_sink_fragment_by_targets,
487 ..
488 } => {
489 let changes = unregistered_fragment_ids
490 .iter()
491 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
492 .chain(dropped_sink_fragment_by_targets.iter().map(
493 |(target_fragment, sink_fragments)| {
494 (
495 *target_fragment,
496 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
497 )
498 },
499 ))
500 .collect();
501
502 Some(changes)
503 }
504 Command::CreateStreamingJob { info, job_type, .. } => {
505 assert!(
506 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
507 "should handle fragment changes separately for snapshot backfill"
508 );
509 let mut changes: HashMap<_, _> = info
510 .stream_job_fragments
511 .new_fragment_info(&info.init_split_assignment)
512 .map(|(fragment_id, fragment_info)| {
513 (
514 fragment_id,
515 CommandFragmentChanges::NewFragment {
516 job_id: info.streaming_job.id().into(),
517 info: fragment_info,
518 is_existing: false,
519 },
520 )
521 })
522 .collect();
523
524 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
525 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
526 changes.insert(
527 downstream_fragment_id,
528 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
529 upstream_fragment_id: ctx.sink_fragment_id,
530 sink_output_schema: ctx.sink_output_fields.clone(),
531 project_exprs: ctx.project_exprs.clone(),
532 }),
533 );
534 }
535
536 Some(changes)
537 }
538 Command::RescheduleFragment { reschedules, .. } => Some(
539 reschedules
540 .iter()
541 .map(|(fragment_id, reschedule)| {
542 (
543 *fragment_id,
544 CommandFragmentChanges::Reschedule {
545 new_actors: reschedule
546 .added_actors
547 .iter()
548 .flat_map(|(node_id, actors)| {
549 actors.iter().map(|actor_id| {
550 (
551 *actor_id,
552 InflightActorInfo {
553 worker_id: *node_id,
554 vnode_bitmap: reschedule
555 .newly_created_actors
556 .get(actor_id)
557 .expect("should exist")
558 .0
559 .0
560 .vnode_bitmap
561 .clone(),
562 splits: reschedule
563 .actor_splits
564 .get(actor_id)
565 .cloned()
566 .unwrap_or_default(),
567 },
568 )
569 })
570 })
571 .collect(),
572 actor_update_vnode_bitmap: reschedule
573 .vnode_bitmap_updates
574 .iter()
575 .filter(|(actor_id, _)| {
576 !reschedule.newly_created_actors.contains_key(actor_id)
578 })
579 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
580 .collect(),
581 to_remove: reschedule.removed_actors.iter().cloned().collect(),
582 actor_splits: reschedule.actor_splits.clone(),
583 },
584 )
585 })
586 .collect(),
587 ),
588 Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
589 Command::MergeSnapshotBackfillStreamingJobs(_) => None,
590 Command::SourceChangeSplit(SplitState {
591 split_assignment, ..
592 }) => Some(
593 split_assignment
594 .iter()
595 .map(|(&fragment_id, splits)| {
596 (
597 fragment_id,
598 CommandFragmentChanges::SplitAssignment {
599 actor_splits: splits.clone(),
600 },
601 )
602 })
603 .collect(),
604 ),
605 Command::Throttle(_) => None,
606 Command::CreateSubscription { .. } => None,
607 Command::DropSubscription { .. } => None,
608 Command::ConnectorPropsChange(_) => None,
609 Command::StartFragmentBackfill { .. } => None,
610 Command::Refresh { .. } => None, Command::LoadFinish { .. } => None, }
613 }
614
615 pub fn need_checkpoint(&self) -> bool {
616 !matches!(self, Command::Resume)
618 }
619}
620
621#[derive(Debug, Clone)]
622pub enum BarrierKind {
623 Initial,
624 Barrier,
625 Checkpoint(Vec<u64>),
627}
628
629impl BarrierKind {
630 pub fn to_protobuf(&self) -> PbBarrierKind {
631 match self {
632 BarrierKind::Initial => PbBarrierKind::Initial,
633 BarrierKind::Barrier => PbBarrierKind::Barrier,
634 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
635 }
636 }
637
638 pub fn is_checkpoint(&self) -> bool {
639 matches!(self, BarrierKind::Checkpoint(_))
640 }
641
642 pub fn is_initial(&self) -> bool {
643 matches!(self, BarrierKind::Initial)
644 }
645
646 pub fn as_str_name(&self) -> &'static str {
647 match self {
648 BarrierKind::Initial => "Initial",
649 BarrierKind::Barrier => "Barrier",
650 BarrierKind::Checkpoint(_) => "Checkpoint",
651 }
652 }
653}
654
655pub(super) struct CommandContext {
658 subscription_info: InflightSubscriptionInfo,
659
660 pub(super) barrier_info: BarrierInfo,
661
662 pub(super) table_ids_to_commit: HashSet<TableId>,
663
664 pub(super) command: Option<Command>,
665
666 _span: tracing::Span,
672}
673
674impl std::fmt::Debug for CommandContext {
675 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
676 f.debug_struct("CommandContext")
677 .field("barrier_info", &self.barrier_info)
678 .field("command", &self.command)
679 .finish()
680 }
681}
682
683impl std::fmt::Display for CommandContext {
684 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
685 write!(
686 f,
687 "prev_epoch={}, curr_epoch={}, kind={}",
688 self.barrier_info.prev_epoch.value().0,
689 self.barrier_info.curr_epoch.value().0,
690 self.barrier_info.kind.as_str_name()
691 )?;
692 if let Some(command) = &self.command {
693 write!(f, ", command={}", command)?;
694 }
695 Ok(())
696 }
697}
698
699impl CommandContext {
700 pub(super) fn new(
701 barrier_info: BarrierInfo,
702 subscription_info: InflightSubscriptionInfo,
703 table_ids_to_commit: HashSet<TableId>,
704 command: Option<Command>,
705 span: tracing::Span,
706 ) -> Self {
707 Self {
708 subscription_info,
709 barrier_info,
710 table_ids_to_commit,
711 command,
712 _span: span,
713 }
714 }
715
716 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
717 let Some(truncate_timestamptz) = Timestamptz::from_secs(
718 self.barrier_info
719 .prev_epoch
720 .value()
721 .as_timestamptz()
722 .timestamp()
723 - retention_second as i64,
724 ) else {
725 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
726 return self.barrier_info.prev_epoch.value();
727 };
728 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
729 }
730
731 pub(super) fn collect_commit_epoch_info(
732 &self,
733 info: &mut CommitEpochInfo,
734 resps: Vec<BarrierCompleteResponse>,
735 backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
736 ) {
737 let (
738 sst_to_context,
739 synced_ssts,
740 new_table_watermarks,
741 old_value_ssts,
742 vector_index_adds,
743 truncate_tables,
744 ) = collect_resp_info(resps);
745
746 let new_table_fragment_infos =
747 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
748 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
749 {
750 let table_fragments = &info.stream_job_fragments;
751 let mut table_ids: HashSet<_> = table_fragments
752 .internal_table_ids()
753 .into_iter()
754 .map(TableId::new)
755 .collect();
756 if let Some(mv_table_id) = table_fragments.mv_table_id() {
757 table_ids.insert(TableId::new(mv_table_id));
758 }
759
760 vec![NewTableFragmentInfo { table_ids }]
761 } else {
762 vec![]
763 };
764
765 let mut mv_log_store_truncate_epoch = HashMap::new();
766 let mut update_truncate_epoch =
768 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
769 .entry(table_id.table_id)
770 {
771 Entry::Occupied(mut entry) => {
772 let prev_truncate_epoch = entry.get_mut();
773 if truncate_epoch < *prev_truncate_epoch {
774 *prev_truncate_epoch = truncate_epoch;
775 }
776 }
777 Entry::Vacant(entry) => {
778 entry.insert(truncate_epoch);
779 }
780 };
781 for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
782 if let Some(truncate_epoch) = subscriptions
783 .values()
784 .max()
785 .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
786 {
787 update_truncate_epoch(*mv_table_id, truncate_epoch);
788 }
789 }
790 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
791 for mv_table_id in upstream_mv_table_ids {
792 update_truncate_epoch(mv_table_id, backfill_epoch);
793 }
794 }
795
796 let table_new_change_log = build_table_change_log_delta(
797 old_value_ssts.into_iter(),
798 synced_ssts.iter().map(|sst| &sst.sst_info),
799 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
800 mv_log_store_truncate_epoch.into_iter(),
801 );
802
803 let epoch = self.barrier_info.prev_epoch();
804 for table_id in &self.table_ids_to_commit {
805 info.tables_to_commit
806 .try_insert(*table_id, epoch)
807 .expect("non duplicate");
808 }
809
810 info.sstables.extend(synced_ssts);
811 info.new_table_watermarks.extend(new_table_watermarks);
812 info.sst_to_context.extend(sst_to_context);
813 info.new_table_fragment_infos
814 .extend(new_table_fragment_infos);
815 info.change_log_delta.extend(table_new_change_log);
816 for (table_id, vector_index_adds) in vector_index_adds {
817 info.vector_index_delta
818 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
819 .expect("non-duplicate");
820 }
821 if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
822 for fragment in job_info.stream_job_fragments.fragments.values() {
823 visit_stream_node_cont(&fragment.nodes, |node| {
824 match node.node_body.as_ref().unwrap() {
825 NodeBody::VectorIndexWrite(vector_index_write) => {
826 let index_table = vector_index_write.table.as_ref().unwrap();
827 assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
828 info.vector_index_delta
829 .try_insert(
830 index_table.id.into(),
831 VectorIndexDelta::Init(PbVectorIndexInit {
832 info: Some(index_table.vector_index_info.unwrap()),
833 }),
834 )
835 .expect("non-duplicate");
836 false
837 }
838 _ => true,
839 }
840 })
841 }
842 }
843 info.truncate_tables.extend(truncate_tables);
844 }
845}
846
847impl Command {
848 pub(super) fn to_mutation(
852 &self,
853 is_currently_paused: bool,
854 edges: &mut Option<FragmentEdgeBuildResult>,
855 control_stream_manager: &ControlStreamManager,
856 ) -> Option<Mutation> {
857 match self {
858 Command::Flush => None,
859
860 Command::Pause => {
861 if !is_currently_paused {
864 Some(Mutation::Pause(PauseMutation {}))
865 } else {
866 None
867 }
868 }
869
870 Command::Resume => {
871 if is_currently_paused {
873 Some(Mutation::Resume(ResumeMutation {}))
874 } else {
875 None
876 }
877 }
878
879 Command::SourceChangeSplit(SplitState {
880 split_assignment, ..
881 }) => {
882 let mut diff = HashMap::new();
883
884 for actor_splits in split_assignment.values() {
885 diff.extend(actor_splits.clone());
886 }
887
888 Some(Mutation::Splits(SourceChangeSplitMutation {
889 actor_splits: build_actor_connector_splits(&diff),
890 }))
891 }
892
893 Command::Throttle(config) => {
894 let mut actor_to_apply = HashMap::new();
895 for per_fragment in config.values() {
896 actor_to_apply.extend(
897 per_fragment
898 .iter()
899 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
900 );
901 }
902
903 Some(Mutation::Throttle(ThrottleMutation {
904 actor_throttle: actor_to_apply,
905 }))
906 }
907
908 Command::DropStreamingJobs {
909 actors,
910 dropped_sink_fragment_by_targets,
911 ..
912 } => Some(Mutation::Stop(StopMutation {
913 actors: actors.clone(),
914 dropped_sink_fragments: dropped_sink_fragment_by_targets
915 .values()
916 .flatten()
917 .cloned()
918 .collect(),
919 })),
920
921 Command::CreateStreamingJob {
922 info:
923 CreateStreamingJobCommandInfo {
924 stream_job_fragments: table_fragments,
925 init_split_assignment: split_assignment,
926 upstream_fragment_downstreams,
927 fragment_backfill_ordering,
928 cdc_table_snapshot_split_assignment,
929 ..
930 },
931 job_type,
932 ..
933 } => {
934 let edges = edges.as_mut().expect("should exist");
935 let added_actors = table_fragments.actor_ids();
936 let actor_splits = split_assignment
937 .values()
938 .flat_map(build_actor_connector_splits)
939 .collect();
940 let subscriptions_to_add =
941 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
942 job_type
943 {
944 snapshot_backfill_info
945 .upstream_mv_table_id_to_backfill_epoch
946 .keys()
947 .map(|table_id| SubscriptionUpstreamInfo {
948 subscriber_id: table_fragments.stream_job_id().table_id,
949 upstream_mv_table_id: table_id.table_id,
950 })
951 .collect()
952 } else {
953 Default::default()
954 };
955 let backfill_nodes_to_pause: Vec<_> =
956 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
957 .into_iter()
958 .collect();
959
960 let new_upstream_sinks =
961 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
962 sink_fragment_id,
963 sink_output_fields,
964 project_exprs,
965 new_sink_downstream,
966 ..
967 }) = job_type
968 {
969 let new_sink_actors = table_fragments
970 .actors_to_create()
971 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
972 .exactly_one()
973 .map(|(_, _, actors)| {
974 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
975 actor_id: actor.actor_id,
976 host: Some(control_stream_manager.host_addr(worker_id)),
977 })
978 })
979 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
980 let new_upstream_sink = PbNewUpstreamSink {
981 info: Some(PbUpstreamSinkInfo {
982 upstream_fragment_id: *sink_fragment_id,
983 sink_output_schema: sink_output_fields.clone(),
984 project_exprs: project_exprs.clone(),
985 }),
986 upstream_actors: new_sink_actors.collect(),
987 };
988 HashMap::from([(
989 new_sink_downstream.downstream_fragment_id,
990 new_upstream_sink.clone(),
991 )])
992 } else {
993 HashMap::new()
994 };
995
996 let add_mutation = AddMutation {
997 actor_dispatchers: edges
998 .dispatchers
999 .extract_if(|fragment_id, _| {
1000 upstream_fragment_downstreams.contains_key(fragment_id)
1001 })
1002 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1003 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1004 .collect(),
1005 added_actors,
1006 actor_splits,
1007 pause: is_currently_paused,
1009 subscriptions_to_add,
1010 backfill_nodes_to_pause,
1011 actor_cdc_table_snapshot_splits:
1012 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1013 cdc_table_snapshot_split_assignment.clone(),
1014 )
1015 .into(),
1016 new_upstream_sinks,
1017 };
1018
1019 Some(Mutation::Add(add_mutation))
1020 }
1021 Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
1022 Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1023 info: jobs_to_merge
1024 .iter()
1025 .flat_map(|(table_id, (backfill_upstream_tables, _))| {
1026 backfill_upstream_tables
1027 .iter()
1028 .map(move |upstream_table_id| SubscriptionUpstreamInfo {
1029 subscriber_id: table_id.table_id,
1030 upstream_mv_table_id: upstream_table_id.table_id,
1031 })
1032 })
1033 .collect(),
1034 }))
1035 }
1036
1037 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1038 old_fragments,
1039 replace_upstream,
1040 upstream_fragment_downstreams,
1041 init_split_assignment,
1042 auto_refresh_schema_sinks,
1043 cdc_table_snapshot_split_assignment,
1044 ..
1045 }) => {
1046 let edges = edges.as_mut().expect("should exist");
1047 let merge_updates = edges
1048 .merge_updates
1049 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1050 .collect();
1051 let dispatchers = edges
1052 .dispatchers
1053 .extract_if(|fragment_id, _| {
1054 upstream_fragment_downstreams.contains_key(fragment_id)
1055 })
1056 .collect();
1057 let cdc_table_snapshot_split_assignment =
1058 if cdc_table_snapshot_split_assignment.is_empty() {
1059 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
1060 } else {
1061 CdcTableSnapshotSplitAssignmentWithGeneration::new(
1062 cdc_table_snapshot_split_assignment.clone(),
1063 control_stream_manager
1064 .env
1065 .cdc_table_backfill_tracker
1066 .next_generation(iter::once(old_fragments.stream_job_id.table_id)),
1067 )
1068 };
1069 Self::generate_update_mutation_for_replace_table(
1070 old_fragments.actor_ids().into_iter().chain(
1071 auto_refresh_schema_sinks
1072 .as_ref()
1073 .into_iter()
1074 .flat_map(|sinks| {
1075 sinks.iter().flat_map(|sink| {
1076 sink.original_fragment
1077 .actors
1078 .iter()
1079 .map(|actor| actor.actor_id)
1080 })
1081 }),
1082 ),
1083 merge_updates,
1084 dispatchers,
1085 init_split_assignment,
1086 cdc_table_snapshot_split_assignment,
1087 auto_refresh_schema_sinks.as_ref(),
1088 )
1089 }
1090
1091 Command::RescheduleFragment {
1092 reschedules,
1093 fragment_actors,
1094 ..
1095 } => {
1096 let mut dispatcher_update = HashMap::new();
1097 for reschedule in reschedules.values() {
1098 for &(upstream_fragment_id, dispatcher_id) in
1099 &reschedule.upstream_fragment_dispatcher_ids
1100 {
1101 let upstream_actor_ids = fragment_actors
1103 .get(&upstream_fragment_id)
1104 .expect("should contain");
1105
1106 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1107
1108 for &actor_id in upstream_actor_ids {
1110 let added_downstream_actor_id = if upstream_reschedule
1111 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1112 .unwrap_or(true)
1113 {
1114 reschedule
1115 .added_actors
1116 .values()
1117 .flatten()
1118 .cloned()
1119 .collect()
1120 } else {
1121 Default::default()
1122 };
1123 dispatcher_update
1125 .try_insert(
1126 (actor_id, dispatcher_id),
1127 DispatcherUpdate {
1128 actor_id,
1129 dispatcher_id,
1130 hash_mapping: reschedule
1131 .upstream_dispatcher_mapping
1132 .as_ref()
1133 .map(|m| m.to_protobuf()),
1134 added_downstream_actor_id,
1135 removed_downstream_actor_id: reschedule
1136 .removed_actors
1137 .iter()
1138 .cloned()
1139 .collect(),
1140 },
1141 )
1142 .unwrap();
1143 }
1144 }
1145 }
1146 let dispatcher_update = dispatcher_update.into_values().collect();
1147
1148 let mut merge_update = HashMap::new();
1149 for (&fragment_id, reschedule) in reschedules {
1150 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1151 let downstream_actor_ids = fragment_actors
1153 .get(&downstream_fragment_id)
1154 .expect("should contain");
1155
1156 let downstream_removed_actors: HashSet<_> = reschedules
1160 .get(&downstream_fragment_id)
1161 .map(|downstream_reschedule| {
1162 downstream_reschedule
1163 .removed_actors
1164 .iter()
1165 .copied()
1166 .collect()
1167 })
1168 .unwrap_or_default();
1169
1170 for &actor_id in downstream_actor_ids {
1172 if downstream_removed_actors.contains(&actor_id) {
1173 continue;
1174 }
1175
1176 merge_update
1178 .try_insert(
1179 (actor_id, fragment_id),
1180 MergeUpdate {
1181 actor_id,
1182 upstream_fragment_id: fragment_id,
1183 new_upstream_fragment_id: None,
1184 added_upstream_actors: reschedule
1185 .added_actors
1186 .iter()
1187 .flat_map(|(worker_id, actors)| {
1188 let host =
1189 control_stream_manager.host_addr(*worker_id);
1190 actors.iter().map(move |actor_id| PbActorInfo {
1191 actor_id: *actor_id,
1192 host: Some(host.clone()),
1193 })
1194 })
1195 .collect(),
1196 removed_upstream_actor_id: reschedule
1197 .removed_actors
1198 .iter()
1199 .cloned()
1200 .collect(),
1201 },
1202 )
1203 .unwrap();
1204 }
1205 }
1206 }
1207 let merge_update = merge_update.into_values().collect();
1208
1209 let mut actor_vnode_bitmap_update = HashMap::new();
1210 for reschedule in reschedules.values() {
1211 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1213 let bitmap = bitmap.to_protobuf();
1214 actor_vnode_bitmap_update
1215 .try_insert(actor_id, bitmap)
1216 .unwrap();
1217 }
1218 }
1219 let dropped_actors = reschedules
1220 .values()
1221 .flat_map(|r| r.removed_actors.iter().copied())
1222 .collect();
1223 let mut actor_splits = HashMap::new();
1224 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1225 let mut cdc_table_ids: HashSet<_> = HashSet::default();
1226 for reschedule in reschedules.values() {
1227 for (actor_id, splits) in &reschedule.actor_splits {
1228 actor_splits.insert(
1229 *actor_id as ActorId,
1230 ConnectorSplits {
1231 splits: splits.iter().map(ConnectorSplit::from).collect(),
1232 },
1233 );
1234 }
1235 actor_cdc_table_snapshot_splits.extend(
1236 build_pb_actor_cdc_table_snapshot_splits(
1237 reschedule.cdc_table_snapshot_split_assignment.clone(),
1238 ),
1239 );
1240 if let Some(cdc_table_id) = reschedule.cdc_table_id {
1241 cdc_table_ids.insert(cdc_table_id);
1242 }
1243 }
1244
1245 let actor_new_dispatchers = HashMap::new();
1247 let actor_cdc_table_snapshot_splits = if actor_cdc_table_snapshot_splits.is_empty()
1248 {
1249 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1250 CdcTableSnapshotSplitAssignmentWithGeneration::empty(),
1251 )
1252 .into()
1253 } else {
1254 PbCdcTableSnapshotSplitsWithGeneration {
1255 splits: actor_cdc_table_snapshot_splits,
1256 generation: control_stream_manager
1257 .env
1258 .cdc_table_backfill_tracker
1259 .next_generation(cdc_table_ids.into_iter()),
1260 }
1261 .into()
1262 };
1263 let mutation = Mutation::Update(UpdateMutation {
1264 dispatcher_update,
1265 merge_update,
1266 actor_vnode_bitmap_update,
1267 dropped_actors,
1268 actor_splits,
1269 actor_new_dispatchers,
1270 actor_cdc_table_snapshot_splits,
1271 sink_add_columns: Default::default(),
1272 });
1273 tracing::debug!("update mutation: {mutation:?}");
1274 Some(mutation)
1275 }
1276
1277 Command::CreateSubscription {
1278 upstream_mv_table_id,
1279 subscription_id,
1280 ..
1281 } => Some(Mutation::Add(AddMutation {
1282 actor_dispatchers: Default::default(),
1283 added_actors: vec![],
1284 actor_splits: Default::default(),
1285 pause: false,
1286 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1287 upstream_mv_table_id: upstream_mv_table_id.table_id,
1288 subscriber_id: *subscription_id,
1289 }],
1290 backfill_nodes_to_pause: vec![],
1291 actor_cdc_table_snapshot_splits: Default::default(),
1292 new_upstream_sinks: Default::default(),
1293 })),
1294 Command::DropSubscription {
1295 upstream_mv_table_id,
1296 subscription_id,
1297 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1298 info: vec![SubscriptionUpstreamInfo {
1299 subscriber_id: *subscription_id,
1300 upstream_mv_table_id: upstream_mv_table_id.table_id,
1301 }],
1302 })),
1303 Command::ConnectorPropsChange(config) => {
1304 let mut connector_props_infos = HashMap::default();
1305 for (k, v) in config {
1306 connector_props_infos.insert(
1307 *k,
1308 ConnectorPropsInfo {
1309 connector_props_info: v.clone(),
1310 },
1311 );
1312 }
1313 Some(Mutation::ConnectorPropsChange(
1314 ConnectorPropsChangeMutation {
1315 connector_props_infos,
1316 },
1317 ))
1318 }
1319 Command::StartFragmentBackfill { fragment_ids } => Some(
1320 Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1321 fragment_ids: fragment_ids.clone(),
1322 }),
1323 ),
1324 Command::Refresh {
1325 table_id,
1326 associated_source_id,
1327 } => Some(Mutation::RefreshStart(
1328 risingwave_pb::stream_plan::RefreshStartMutation {
1329 table_id: table_id.table_id,
1330 associated_source_id: associated_source_id.table_id,
1331 },
1332 )),
1333 Command::LoadFinish {
1334 table_id: _,
1335 associated_source_id,
1336 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1337 associated_source_id: associated_source_id.table_id,
1338 })),
1339 }
1340 }
1341
1342 pub(super) fn actors_to_create(
1343 &self,
1344 graph_info: &InflightDatabaseInfo,
1345 edges: &mut Option<FragmentEdgeBuildResult>,
1346 control_stream_manager: &ControlStreamManager,
1347 ) -> Option<StreamJobActorsToCreate> {
1348 match self {
1349 Command::CreateStreamingJob { info, job_type, .. } => {
1350 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1351 return None;
1353 }
1354 let actors_to_create = info.stream_job_fragments.actors_to_create();
1355 let edges = edges.as_mut().expect("should exist");
1356 Some(edges.collect_actors_to_create(actors_to_create))
1357 }
1358 Command::RescheduleFragment {
1359 reschedules,
1360 fragment_actors,
1361 ..
1362 } => {
1363 let mut actor_upstreams = Self::collect_actor_upstreams(
1364 reschedules.iter().map(|(fragment_id, reschedule)| {
1365 (
1366 *fragment_id,
1367 reschedule.newly_created_actors.values().map(
1368 |((actor, dispatchers), _)| {
1369 (actor.actor_id, dispatchers.as_slice())
1370 },
1371 ),
1372 )
1373 }),
1374 Some((reschedules, fragment_actors)),
1375 graph_info,
1376 control_stream_manager,
1377 );
1378 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1379 for (fragment_id, (actor, dispatchers), worker_id) in
1380 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1381 reschedule
1382 .newly_created_actors
1383 .values()
1384 .map(|(actors, status)| (*fragment_id, actors, status))
1385 })
1386 {
1387 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1388 map.entry(*worker_id)
1389 .or_default()
1390 .entry(fragment_id)
1391 .or_insert_with(|| {
1392 let node = graph_info.fragment(fragment_id).nodes.clone();
1393 (node, vec![])
1394 })
1395 .1
1396 .push((actor.clone(), upstreams, dispatchers.clone()));
1397 }
1398 Some(map)
1399 }
1400 Command::ReplaceStreamJob(replace_table) => {
1401 let edges = edges.as_mut().expect("should exist");
1402 let mut actors =
1403 edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create());
1404 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1405 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1406 (
1407 sink.new_fragment.fragment_id,
1408 &sink.new_fragment.nodes,
1409 sink.new_fragment.actors.iter().map(|actor| {
1410 (
1411 actor,
1412 sink.actor_status[&actor.actor_id]
1413 .location
1414 .as_ref()
1415 .unwrap()
1416 .worker_node_id as _,
1417 )
1418 }),
1419 )
1420 }));
1421 for (worker_id, fragment_actors) in sink_actors {
1422 actors.entry(worker_id).or_default().extend(fragment_actors);
1423 }
1424 }
1425 Some(actors)
1426 }
1427 _ => None,
1428 }
1429 }
1430
1431 fn generate_update_mutation_for_replace_table(
1432 dropped_actors: impl IntoIterator<Item = ActorId>,
1433 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1434 dispatchers: FragmentActorDispatchers,
1435 init_split_assignment: &SplitAssignment,
1436 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
1437 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1438 ) -> Option<Mutation> {
1439 let dropped_actors = dropped_actors.into_iter().collect();
1440
1441 let actor_new_dispatchers = dispatchers
1442 .into_values()
1443 .flatten()
1444 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1445 .collect();
1446
1447 let actor_splits = init_split_assignment
1448 .values()
1449 .flat_map(build_actor_connector_splits)
1450 .collect();
1451 Some(Mutation::Update(UpdateMutation {
1452 actor_new_dispatchers,
1453 merge_update: merge_updates.into_values().flatten().collect(),
1454 dropped_actors,
1455 actor_splits,
1456 actor_cdc_table_snapshot_splits:
1457 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1458 cdc_table_snapshot_split_assignment,
1459 )
1460 .into(),
1461 sink_add_columns: auto_refresh_schema_sinks
1462 .as_ref()
1463 .into_iter()
1464 .flat_map(|sinks| {
1465 sinks.iter().map(|sink| {
1466 (
1467 sink.original_sink.id,
1468 PbSinkAddColumns {
1469 fields: sink
1470 .newly_add_fields
1471 .iter()
1472 .map(|field| field.to_prost())
1473 .collect(),
1474 },
1475 )
1476 })
1477 })
1478 .collect(),
1479 ..Default::default()
1480 }))
1481 }
1482
1483 pub fn jobs_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1485 match self {
1486 Command::DropStreamingJobs {
1487 streaming_job_ids, ..
1488 } => Some(streaming_job_ids.iter().cloned()),
1489 _ => None,
1490 }
1491 .into_iter()
1492 .flatten()
1493 }
1494}
1495
1496impl Command {
1497 #[expect(clippy::type_complexity)]
1498 pub(super) fn collect_actor_upstreams(
1499 actor_dispatchers: impl Iterator<
1500 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1501 >,
1502 reschedule_dispatcher_update: Option<(
1503 &HashMap<FragmentId, Reschedule>,
1504 &HashMap<FragmentId, HashSet<ActorId>>,
1505 )>,
1506 graph_info: &InflightDatabaseInfo,
1507 control_stream_manager: &ControlStreamManager,
1508 ) -> HashMap<ActorId, ActorUpstreams> {
1509 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1510 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1511 let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1512 for (upstream_actor_id, dispatchers) in upstream_actors {
1513 let upstream_actor_location =
1514 upstream_fragment.actors[&upstream_actor_id].worker_id;
1515 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1516 for downstream_actor_id in dispatchers
1517 .iter()
1518 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1519 {
1520 actor_upstreams
1521 .entry(*downstream_actor_id)
1522 .or_default()
1523 .entry(upstream_fragment_id)
1524 .or_default()
1525 .insert(
1526 upstream_actor_id,
1527 PbActorInfo {
1528 actor_id: upstream_actor_id,
1529 host: Some(upstream_actor_host.clone()),
1530 },
1531 );
1532 }
1533 }
1534 }
1535 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1536 for reschedule in reschedules.values() {
1537 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1538 let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1539 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1540 for upstream_actor_id in fragment_actors
1541 .get(upstream_fragment_id)
1542 .expect("should exist")
1543 {
1544 let upstream_actor_location =
1545 upstream_fragment.actors[upstream_actor_id].worker_id;
1546 let upstream_actor_host =
1547 control_stream_manager.host_addr(upstream_actor_location);
1548 if let Some(upstream_reschedule) = upstream_reschedule
1549 && upstream_reschedule
1550 .removed_actors
1551 .contains(upstream_actor_id)
1552 {
1553 continue;
1554 }
1555 for (_, downstream_actor_id) in
1556 reschedule
1557 .added_actors
1558 .iter()
1559 .flat_map(|(worker_id, actors)| {
1560 actors.iter().map(|actor| (*worker_id, *actor))
1561 })
1562 {
1563 actor_upstreams
1564 .entry(downstream_actor_id)
1565 .or_default()
1566 .entry(*upstream_fragment_id)
1567 .or_default()
1568 .insert(
1569 *upstream_actor_id,
1570 PbActorInfo {
1571 actor_id: *upstream_actor_id,
1572 host: Some(upstream_actor_host.clone()),
1573 },
1574 );
1575 }
1576 }
1577 }
1578 }
1579 }
1580 actor_upstreams
1581 }
1582}