1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18use std::iter;
19
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::TableId;
23use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::SplitImpl;
29use risingwave_connector::source::cdc::{
30 CdcTableSnapshotSplitAssignment, CdcTableSnapshotSplitAssignmentWithGeneration,
31 build_pb_actor_cdc_table_snapshot_splits,
32 build_pb_actor_cdc_table_snapshot_splits_with_generation,
33};
34use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
35use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::catalog::CreateType;
38use risingwave_pb::catalog::table::PbTableType;
39use risingwave_pb::common::PbActorInfo;
40use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
41use risingwave_pb::source::{
42 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
43};
44use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
45use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
46use risingwave_pb::stream_plan::barrier_mutation::Mutation;
47use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
48use risingwave_pb::stream_plan::stream_node::NodeBody;
49use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
50use risingwave_pb::stream_plan::update_mutation::*;
51use risingwave_pb::stream_plan::{
52 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
53 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumns, PbUpstreamSinkInfo,
54 ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
55 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
56};
57use risingwave_pb::stream_service::BarrierCompleteResponse;
58use tracing::warn;
59
60use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
61use crate::barrier::InflightSubscriptionInfo;
62use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
63use crate::barrier::edge_builder::FragmentEdgeBuildResult;
64use crate::barrier::info::BarrierInfo;
65use crate::barrier::rpc::ControlStreamManager;
66use crate::barrier::utils::collect_resp_info;
67use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
68use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
69use crate::manager::{StreamingJob, StreamingJobType};
70use crate::model::{
71 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
72 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
73 StreamJobFragments, StreamJobFragmentsToCreate,
74};
75use crate::stream::{
76 AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder,
77 JobReschedulePostUpdates, SplitAssignment, SplitState, ThrottleConfig, UpstreamSinkInfo,
78 build_actor_connector_splits,
79};
80
81#[derive(Debug, Clone)]
84pub struct Reschedule {
85 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
87
88 pub removed_actors: HashSet<ActorId>,
90
91 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
93
94 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
96 pub upstream_dispatcher_mapping: Option<ActorMapping>,
101
102 pub downstream_fragment_ids: Vec<FragmentId>,
104
105 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
109
110 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
111
112 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
113
114 pub cdc_table_id: Option<u32>,
115}
116
117#[derive(Debug, Clone)]
124pub struct ReplaceStreamJobPlan {
125 pub old_fragments: StreamJobFragments,
126 pub new_fragments: StreamJobFragmentsToCreate,
127 pub replace_upstream: FragmentReplaceUpstream,
130 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
131 pub init_split_assignment: SplitAssignment,
137 pub streaming_job: StreamingJob,
139 pub tmp_id: u32,
141 pub to_drop_state_table_ids: Vec<TableId>,
143 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
144 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
145}
146
147impl ReplaceStreamJobPlan {
148 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
149 let mut fragment_changes = HashMap::new();
150 for (fragment_id, new_fragment) in self
151 .new_fragments
152 .new_fragment_info(&self.init_split_assignment)
153 {
154 let fragment_change = CommandFragmentChanges::NewFragment {
155 job_id: self.streaming_job.id().into(),
156 info: new_fragment,
157 is_existing: false,
158 };
159 fragment_changes
160 .try_insert(fragment_id, fragment_change)
161 .expect("non-duplicate");
162 }
163 for fragment in self.old_fragments.fragments.values() {
164 fragment_changes
165 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
166 .expect("non-duplicate");
167 }
168 for (fragment_id, replace_map) in &self.replace_upstream {
169 fragment_changes
170 .try_insert(
171 *fragment_id,
172 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
173 )
174 .expect("non-duplicate");
175 }
176 if let Some(sinks) = &self.auto_refresh_schema_sinks {
177 for sink in sinks {
178 let fragment_change = CommandFragmentChanges::NewFragment {
179 job_id: TableId::new(sink.original_sink.id as _),
180 info: sink.new_fragment_info(),
181 is_existing: false,
182 };
183 fragment_changes
184 .try_insert(sink.new_fragment.fragment_id, fragment_change)
185 .expect("non-duplicate");
186 fragment_changes
187 .try_insert(
188 sink.original_fragment.fragment_id,
189 CommandFragmentChanges::RemoveFragment,
190 )
191 .expect("non-duplicate");
192 }
193 }
194 fragment_changes
195 }
196
197 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
199 let mut fragment_replacements = HashMap::new();
200 for (upstream_fragment_id, new_upstream_fragment_id) in
201 self.replace_upstream.values().flatten()
202 {
203 {
204 let r =
205 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
206 if let Some(r) = r {
207 assert_eq!(
208 *new_upstream_fragment_id, r,
209 "one fragment is replaced by multiple fragments"
210 );
211 }
212 }
213 }
214 fragment_replacements
215 }
216}
217
218#[derive(educe::Educe, Clone)]
219#[educe(Debug)]
220pub struct CreateStreamingJobCommandInfo {
221 #[educe(Debug(ignore))]
222 pub stream_job_fragments: StreamJobFragmentsToCreate,
223 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
224 pub init_split_assignment: SplitAssignment,
225 pub definition: String,
226 pub job_type: StreamingJobType,
227 pub create_type: CreateType,
228 pub streaming_job: StreamingJob,
229 pub fragment_backfill_ordering: FragmentBackfillOrder,
230 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
231 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
232}
233
234impl StreamJobFragments {
235 pub(super) fn new_fragment_info<'a>(
236 &'a self,
237 assignment: &'a SplitAssignment,
238 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
239 self.fragments.values().map(|fragment| {
240 let mut fragment_splits = assignment
241 .get(&fragment.fragment_id)
242 .cloned()
243 .unwrap_or_default();
244
245 (
246 fragment.fragment_id,
247 InflightFragmentInfo {
248 fragment_id: fragment.fragment_id,
249 distribution_type: fragment.distribution_type.into(),
250 fragment_type_mask: fragment.fragment_type_mask,
251 vnode_count: fragment.vnode_count(),
252 nodes: fragment.nodes.clone(),
253 actors: fragment
254 .actors
255 .iter()
256 .map(|actor| {
257 (
258 actor.actor_id,
259 InflightActorInfo {
260 worker_id: self
261 .actor_status
262 .get(&actor.actor_id)
263 .expect("should exist")
264 .worker_id()
265 as WorkerId,
266 vnode_bitmap: actor.vnode_bitmap.clone(),
267 splits: fragment_splits
268 .remove(&actor.actor_id)
269 .unwrap_or_default(),
270 },
271 )
272 })
273 .collect(),
274 state_table_ids: fragment
275 .state_table_ids
276 .iter()
277 .map(|table_id| TableId::new(*table_id))
278 .collect(),
279 },
280 )
281 })
282 }
283}
284
285#[derive(Debug, Clone)]
286pub struct SnapshotBackfillInfo {
287 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
291}
292
293#[derive(Debug, Clone)]
294pub enum CreateStreamingJobType {
295 Normal,
296 SinkIntoTable(UpstreamSinkInfo),
297 SnapshotBackfill(SnapshotBackfillInfo),
298}
299
300#[derive(Debug)]
305pub enum Command {
306 Flush,
309
310 Pause,
313
314 Resume,
318
319 DropStreamingJobs {
327 streaming_job_ids: HashSet<TableId>,
328 actors: Vec<ActorId>,
329 unregistered_state_table_ids: HashSet<TableId>,
330 unregistered_fragment_ids: HashSet<FragmentId>,
331 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
333 },
334
335 CreateStreamingJob {
345 info: CreateStreamingJobCommandInfo,
346 job_type: CreateStreamingJobType,
347 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
348 },
349 MergeSnapshotBackfillStreamingJobs(
350 HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
351 ),
352
353 RescheduleFragment {
359 reschedules: HashMap<FragmentId, Reschedule>,
360 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
362 post_updates: JobReschedulePostUpdates,
364 },
365
366 ReplaceStreamJob(ReplaceStreamJobPlan),
373
374 SourceChangeSplit(SplitState),
377
378 Throttle(ThrottleConfig),
381
382 CreateSubscription {
385 subscription_id: u32,
386 upstream_mv_table_id: TableId,
387 retention_second: u64,
388 },
389
390 DropSubscription {
394 subscription_id: u32,
395 upstream_mv_table_id: TableId,
396 },
397
398 ConnectorPropsChange(ConnectorPropsChange),
399
400 StartFragmentBackfill {
402 fragment_ids: Vec<FragmentId>,
403 },
404
405 Refresh {
408 table_id: TableId,
409 associated_source_id: TableId,
410 },
411 ListFinish {
412 table_id: TableId,
413 associated_source_id: TableId,
414 },
415 LoadFinish {
416 table_id: TableId,
417 associated_source_id: TableId,
418 },
419}
420
421impl std::fmt::Display for Command {
423 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
424 match self {
425 Command::Flush => write!(f, "Flush"),
426 Command::Pause => write!(f, "Pause"),
427 Command::Resume => write!(f, "Resume"),
428 Command::DropStreamingJobs {
429 streaming_job_ids, ..
430 } => {
431 write!(
432 f,
433 "DropStreamingJobs: {}",
434 streaming_job_ids.iter().sorted().join(", ")
435 )
436 }
437 Command::CreateStreamingJob { info, .. } => {
438 write!(f, "CreateStreamingJob: {}", info.streaming_job)
439 }
440 Command::MergeSnapshotBackfillStreamingJobs(_) => {
441 write!(f, "MergeSnapshotBackfillStreamingJobs")
442 }
443 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
444 Command::ReplaceStreamJob(plan) => {
445 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
446 }
447 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
448 Command::Throttle(_) => write!(f, "Throttle"),
449 Command::CreateSubscription {
450 subscription_id, ..
451 } => write!(f, "CreateSubscription: {subscription_id}"),
452 Command::DropSubscription {
453 subscription_id, ..
454 } => write!(f, "DropSubscription: {subscription_id}"),
455 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
456 Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
457 Command::Refresh {
458 table_id,
459 associated_source_id,
460 } => write!(
461 f,
462 "Refresh: {} (source: {})",
463 table_id, associated_source_id
464 ),
465 Command::ListFinish {
466 table_id,
467 associated_source_id,
468 } => write!(
469 f,
470 "ListFinish: {} (source: {})",
471 table_id, associated_source_id
472 ),
473 Command::LoadFinish {
474 table_id,
475 associated_source_id,
476 } => write!(
477 f,
478 "LoadFinish: {} (source: {})",
479 table_id, associated_source_id
480 ),
481 }
482 }
483}
484
485impl Command {
486 pub fn pause() -> Self {
487 Self::Pause
488 }
489
490 pub fn resume() -> Self {
491 Self::Resume
492 }
493
494 pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
495 match self {
496 Command::Flush => None,
497 Command::Pause => None,
498 Command::Resume => None,
499 Command::DropStreamingJobs {
500 unregistered_fragment_ids,
501 dropped_sink_fragment_by_targets,
502 ..
503 } => {
504 let changes = unregistered_fragment_ids
505 .iter()
506 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
507 .chain(dropped_sink_fragment_by_targets.iter().map(
508 |(target_fragment, sink_fragments)| {
509 (
510 *target_fragment,
511 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
512 )
513 },
514 ))
515 .collect();
516
517 Some(changes)
518 }
519 Command::CreateStreamingJob { info, job_type, .. } => {
520 assert!(
521 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
522 "should handle fragment changes separately for snapshot backfill"
523 );
524 let mut changes: HashMap<_, _> = info
525 .stream_job_fragments
526 .new_fragment_info(&info.init_split_assignment)
527 .map(|(fragment_id, fragment_info)| {
528 (
529 fragment_id,
530 CommandFragmentChanges::NewFragment {
531 job_id: info.streaming_job.id().into(),
532 info: fragment_info,
533 is_existing: false,
534 },
535 )
536 })
537 .collect();
538
539 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
540 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
541 changes.insert(
542 downstream_fragment_id,
543 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
544 upstream_fragment_id: ctx.sink_fragment_id,
545 sink_output_schema: ctx.sink_output_fields.clone(),
546 project_exprs: ctx.project_exprs.clone(),
547 }),
548 );
549 }
550
551 Some(changes)
552 }
553 Command::RescheduleFragment { reschedules, .. } => Some(
554 reschedules
555 .iter()
556 .map(|(fragment_id, reschedule)| {
557 (
558 *fragment_id,
559 CommandFragmentChanges::Reschedule {
560 new_actors: reschedule
561 .added_actors
562 .iter()
563 .flat_map(|(node_id, actors)| {
564 actors.iter().map(|actor_id| {
565 (
566 *actor_id,
567 InflightActorInfo {
568 worker_id: *node_id,
569 vnode_bitmap: reschedule
570 .newly_created_actors
571 .get(actor_id)
572 .expect("should exist")
573 .0
574 .0
575 .vnode_bitmap
576 .clone(),
577 splits: reschedule
578 .actor_splits
579 .get(actor_id)
580 .cloned()
581 .unwrap_or_default(),
582 },
583 )
584 })
585 })
586 .collect(),
587 actor_update_vnode_bitmap: reschedule
588 .vnode_bitmap_updates
589 .iter()
590 .filter(|(actor_id, _)| {
591 !reschedule.newly_created_actors.contains_key(actor_id)
593 })
594 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
595 .collect(),
596 to_remove: reschedule.removed_actors.iter().cloned().collect(),
597 actor_splits: reschedule.actor_splits.clone(),
598 },
599 )
600 })
601 .collect(),
602 ),
603 Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
604 Command::MergeSnapshotBackfillStreamingJobs(_) => None,
605 Command::SourceChangeSplit(SplitState {
606 split_assignment, ..
607 }) => Some(
608 split_assignment
609 .iter()
610 .map(|(&fragment_id, splits)| {
611 (
612 fragment_id,
613 CommandFragmentChanges::SplitAssignment {
614 actor_splits: splits.clone(),
615 },
616 )
617 })
618 .collect(),
619 ),
620 Command::Throttle(_) => None,
621 Command::CreateSubscription { .. } => None,
622 Command::DropSubscription { .. } => None,
623 Command::ConnectorPropsChange(_) => None,
624 Command::StartFragmentBackfill { .. } => None,
625 Command::Refresh { .. } => None, Command::ListFinish { .. } => None, Command::LoadFinish { .. } => None, }
629 }
630
631 pub fn need_checkpoint(&self) -> bool {
632 !matches!(self, Command::Resume)
634 }
635}
636
637#[derive(Debug, Clone)]
638pub enum BarrierKind {
639 Initial,
640 Barrier,
641 Checkpoint(Vec<u64>),
643}
644
645impl BarrierKind {
646 pub fn to_protobuf(&self) -> PbBarrierKind {
647 match self {
648 BarrierKind::Initial => PbBarrierKind::Initial,
649 BarrierKind::Barrier => PbBarrierKind::Barrier,
650 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
651 }
652 }
653
654 pub fn is_checkpoint(&self) -> bool {
655 matches!(self, BarrierKind::Checkpoint(_))
656 }
657
658 pub fn is_initial(&self) -> bool {
659 matches!(self, BarrierKind::Initial)
660 }
661
662 pub fn as_str_name(&self) -> &'static str {
663 match self {
664 BarrierKind::Initial => "Initial",
665 BarrierKind::Barrier => "Barrier",
666 BarrierKind::Checkpoint(_) => "Checkpoint",
667 }
668 }
669}
670
671pub(super) struct CommandContext {
674 subscription_info: InflightSubscriptionInfo,
675
676 pub(super) barrier_info: BarrierInfo,
677
678 pub(super) table_ids_to_commit: HashSet<TableId>,
679
680 pub(super) command: Option<Command>,
681
682 _span: tracing::Span,
688}
689
690impl std::fmt::Debug for CommandContext {
691 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
692 f.debug_struct("CommandContext")
693 .field("barrier_info", &self.barrier_info)
694 .field("command", &self.command)
695 .finish()
696 }
697}
698
699impl std::fmt::Display for CommandContext {
700 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
701 write!(
702 f,
703 "prev_epoch={}, curr_epoch={}, kind={}",
704 self.barrier_info.prev_epoch.value().0,
705 self.barrier_info.curr_epoch.value().0,
706 self.barrier_info.kind.as_str_name()
707 )?;
708 if let Some(command) = &self.command {
709 write!(f, ", command={}", command)?;
710 }
711 Ok(())
712 }
713}
714
715impl CommandContext {
716 pub(super) fn new(
717 barrier_info: BarrierInfo,
718 subscription_info: InflightSubscriptionInfo,
719 table_ids_to_commit: HashSet<TableId>,
720 command: Option<Command>,
721 span: tracing::Span,
722 ) -> Self {
723 Self {
724 subscription_info,
725 barrier_info,
726 table_ids_to_commit,
727 command,
728 _span: span,
729 }
730 }
731
732 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
733 let Some(truncate_timestamptz) = Timestamptz::from_secs(
734 self.barrier_info
735 .prev_epoch
736 .value()
737 .as_timestamptz()
738 .timestamp()
739 - retention_second as i64,
740 ) else {
741 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
742 return self.barrier_info.prev_epoch.value();
743 };
744 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
745 }
746
747 pub(super) fn collect_commit_epoch_info(
748 &self,
749 info: &mut CommitEpochInfo,
750 resps: Vec<BarrierCompleteResponse>,
751 backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
752 ) {
753 let (
754 sst_to_context,
755 synced_ssts,
756 new_table_watermarks,
757 old_value_ssts,
758 vector_index_adds,
759 truncate_tables,
760 ) = collect_resp_info(resps);
761
762 let new_table_fragment_infos =
763 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
764 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
765 {
766 let table_fragments = &info.stream_job_fragments;
767 let mut table_ids: HashSet<_> = table_fragments
768 .internal_table_ids()
769 .into_iter()
770 .map(TableId::new)
771 .collect();
772 if let Some(mv_table_id) = table_fragments.mv_table_id() {
773 table_ids.insert(TableId::new(mv_table_id));
774 }
775
776 vec![NewTableFragmentInfo { table_ids }]
777 } else {
778 vec![]
779 };
780
781 let mut mv_log_store_truncate_epoch = HashMap::new();
782 let mut update_truncate_epoch =
784 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
785 .entry(table_id.table_id)
786 {
787 Entry::Occupied(mut entry) => {
788 let prev_truncate_epoch = entry.get_mut();
789 if truncate_epoch < *prev_truncate_epoch {
790 *prev_truncate_epoch = truncate_epoch;
791 }
792 }
793 Entry::Vacant(entry) => {
794 entry.insert(truncate_epoch);
795 }
796 };
797 for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
798 if let Some(truncate_epoch) = subscriptions
799 .values()
800 .max()
801 .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
802 {
803 update_truncate_epoch(*mv_table_id, truncate_epoch);
804 }
805 }
806 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
807 for mv_table_id in upstream_mv_table_ids {
808 update_truncate_epoch(mv_table_id, backfill_epoch);
809 }
810 }
811
812 let table_new_change_log = build_table_change_log_delta(
813 old_value_ssts.into_iter(),
814 synced_ssts.iter().map(|sst| &sst.sst_info),
815 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
816 mv_log_store_truncate_epoch.into_iter(),
817 );
818
819 let epoch = self.barrier_info.prev_epoch();
820 for table_id in &self.table_ids_to_commit {
821 info.tables_to_commit
822 .try_insert(*table_id, epoch)
823 .expect("non duplicate");
824 }
825
826 info.sstables.extend(synced_ssts);
827 info.new_table_watermarks.extend(new_table_watermarks);
828 info.sst_to_context.extend(sst_to_context);
829 info.new_table_fragment_infos
830 .extend(new_table_fragment_infos);
831 info.change_log_delta.extend(table_new_change_log);
832 for (table_id, vector_index_adds) in vector_index_adds {
833 info.vector_index_delta
834 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
835 .expect("non-duplicate");
836 }
837 if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
838 for fragment in job_info.stream_job_fragments.fragments.values() {
839 visit_stream_node_cont(&fragment.nodes, |node| {
840 match node.node_body.as_ref().unwrap() {
841 NodeBody::VectorIndexWrite(vector_index_write) => {
842 let index_table = vector_index_write.table.as_ref().unwrap();
843 assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
844 info.vector_index_delta
845 .try_insert(
846 index_table.id.into(),
847 VectorIndexDelta::Init(PbVectorIndexInit {
848 info: Some(index_table.vector_index_info.unwrap()),
849 }),
850 )
851 .expect("non-duplicate");
852 false
853 }
854 _ => true,
855 }
856 })
857 }
858 }
859 info.truncate_tables.extend(truncate_tables);
860 }
861}
862
863impl Command {
864 pub(super) fn to_mutation(
868 &self,
869 is_currently_paused: bool,
870 edges: &mut Option<FragmentEdgeBuildResult>,
871 control_stream_manager: &ControlStreamManager,
872 ) -> Option<Mutation> {
873 match self {
874 Command::Flush => None,
875
876 Command::Pause => {
877 if !is_currently_paused {
880 Some(Mutation::Pause(PauseMutation {}))
881 } else {
882 None
883 }
884 }
885
886 Command::Resume => {
887 if is_currently_paused {
889 Some(Mutation::Resume(ResumeMutation {}))
890 } else {
891 None
892 }
893 }
894
895 Command::SourceChangeSplit(SplitState {
896 split_assignment, ..
897 }) => {
898 let mut diff = HashMap::new();
899
900 for actor_splits in split_assignment.values() {
901 diff.extend(actor_splits.clone());
902 }
903
904 Some(Mutation::Splits(SourceChangeSplitMutation {
905 actor_splits: build_actor_connector_splits(&diff),
906 }))
907 }
908
909 Command::Throttle(config) => {
910 let mut actor_to_apply = HashMap::new();
911 for per_fragment in config.values() {
912 actor_to_apply.extend(
913 per_fragment
914 .iter()
915 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
916 );
917 }
918
919 Some(Mutation::Throttle(ThrottleMutation {
920 actor_throttle: actor_to_apply,
921 }))
922 }
923
924 Command::DropStreamingJobs {
925 actors,
926 dropped_sink_fragment_by_targets,
927 ..
928 } => Some(Mutation::Stop(StopMutation {
929 actors: actors.clone(),
930 dropped_sink_fragments: dropped_sink_fragment_by_targets
931 .values()
932 .flatten()
933 .cloned()
934 .collect(),
935 })),
936
937 Command::CreateStreamingJob {
938 info:
939 CreateStreamingJobCommandInfo {
940 stream_job_fragments: table_fragments,
941 init_split_assignment: split_assignment,
942 upstream_fragment_downstreams,
943 fragment_backfill_ordering,
944 cdc_table_snapshot_split_assignment,
945 ..
946 },
947 job_type,
948 ..
949 } => {
950 let edges = edges.as_mut().expect("should exist");
951 let added_actors = table_fragments.actor_ids();
952 let actor_splits = split_assignment
953 .values()
954 .flat_map(build_actor_connector_splits)
955 .collect();
956 let subscriptions_to_add =
957 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
958 job_type
959 {
960 snapshot_backfill_info
961 .upstream_mv_table_id_to_backfill_epoch
962 .keys()
963 .map(|table_id| SubscriptionUpstreamInfo {
964 subscriber_id: table_fragments.stream_job_id().table_id,
965 upstream_mv_table_id: table_id.table_id,
966 })
967 .collect()
968 } else {
969 Default::default()
970 };
971 let backfill_nodes_to_pause: Vec<_> =
972 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
973 .into_iter()
974 .collect();
975
976 let new_upstream_sinks =
977 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
978 sink_fragment_id,
979 sink_output_fields,
980 project_exprs,
981 new_sink_downstream,
982 ..
983 }) = job_type
984 {
985 let new_sink_actors = table_fragments
986 .actors_to_create()
987 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
988 .exactly_one()
989 .map(|(_, _, actors)| {
990 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
991 actor_id: actor.actor_id,
992 host: Some(control_stream_manager.host_addr(worker_id)),
993 })
994 })
995 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
996 let new_upstream_sink = PbNewUpstreamSink {
997 info: Some(PbUpstreamSinkInfo {
998 upstream_fragment_id: *sink_fragment_id,
999 sink_output_schema: sink_output_fields.clone(),
1000 project_exprs: project_exprs.clone(),
1001 }),
1002 upstream_actors: new_sink_actors.collect(),
1003 };
1004 HashMap::from([(
1005 new_sink_downstream.downstream_fragment_id,
1006 new_upstream_sink,
1007 )])
1008 } else {
1009 HashMap::new()
1010 };
1011
1012 let add_mutation = AddMutation {
1013 actor_dispatchers: edges
1014 .dispatchers
1015 .extract_if(|fragment_id, _| {
1016 upstream_fragment_downstreams.contains_key(fragment_id)
1017 })
1018 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1019 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1020 .collect(),
1021 added_actors,
1022 actor_splits,
1023 pause: is_currently_paused,
1025 subscriptions_to_add,
1026 backfill_nodes_to_pause,
1027 actor_cdc_table_snapshot_splits:
1028 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1029 cdc_table_snapshot_split_assignment.clone(),
1030 )
1031 .into(),
1032 new_upstream_sinks,
1033 };
1034
1035 Some(Mutation::Add(add_mutation))
1036 }
1037 Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
1038 Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1039 info: jobs_to_merge
1040 .iter()
1041 .flat_map(|(table_id, (backfill_upstream_tables, _))| {
1042 backfill_upstream_tables
1043 .iter()
1044 .map(move |upstream_table_id| SubscriptionUpstreamInfo {
1045 subscriber_id: table_id.table_id,
1046 upstream_mv_table_id: upstream_table_id.table_id,
1047 })
1048 })
1049 .collect(),
1050 }))
1051 }
1052
1053 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1054 old_fragments,
1055 replace_upstream,
1056 upstream_fragment_downstreams,
1057 init_split_assignment,
1058 auto_refresh_schema_sinks,
1059 cdc_table_snapshot_split_assignment,
1060 ..
1061 }) => {
1062 let edges = edges.as_mut().expect("should exist");
1063 let merge_updates = edges
1064 .merge_updates
1065 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1066 .collect();
1067 let dispatchers = edges
1068 .dispatchers
1069 .extract_if(|fragment_id, _| {
1070 upstream_fragment_downstreams.contains_key(fragment_id)
1071 })
1072 .collect();
1073 let cdc_table_snapshot_split_assignment =
1074 if cdc_table_snapshot_split_assignment.is_empty() {
1075 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
1076 } else {
1077 CdcTableSnapshotSplitAssignmentWithGeneration::new(
1078 cdc_table_snapshot_split_assignment.clone(),
1079 control_stream_manager
1080 .env
1081 .cdc_table_backfill_tracker
1082 .next_generation(iter::once(old_fragments.stream_job_id.table_id)),
1083 )
1084 };
1085 Self::generate_update_mutation_for_replace_table(
1086 old_fragments.actor_ids().into_iter().chain(
1087 auto_refresh_schema_sinks
1088 .as_ref()
1089 .into_iter()
1090 .flat_map(|sinks| {
1091 sinks.iter().flat_map(|sink| {
1092 sink.original_fragment
1093 .actors
1094 .iter()
1095 .map(|actor| actor.actor_id)
1096 })
1097 }),
1098 ),
1099 merge_updates,
1100 dispatchers,
1101 init_split_assignment,
1102 cdc_table_snapshot_split_assignment,
1103 auto_refresh_schema_sinks.as_ref(),
1104 )
1105 }
1106
1107 Command::RescheduleFragment {
1108 reschedules,
1109 fragment_actors,
1110 ..
1111 } => {
1112 let mut dispatcher_update = HashMap::new();
1113 for reschedule in reschedules.values() {
1114 for &(upstream_fragment_id, dispatcher_id) in
1115 &reschedule.upstream_fragment_dispatcher_ids
1116 {
1117 let upstream_actor_ids = fragment_actors
1119 .get(&upstream_fragment_id)
1120 .expect("should contain");
1121
1122 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1123
1124 for &actor_id in upstream_actor_ids {
1126 let added_downstream_actor_id = if upstream_reschedule
1127 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1128 .unwrap_or(true)
1129 {
1130 reschedule
1131 .added_actors
1132 .values()
1133 .flatten()
1134 .cloned()
1135 .collect()
1136 } else {
1137 Default::default()
1138 };
1139 dispatcher_update
1141 .try_insert(
1142 (actor_id, dispatcher_id),
1143 DispatcherUpdate {
1144 actor_id,
1145 dispatcher_id,
1146 hash_mapping: reschedule
1147 .upstream_dispatcher_mapping
1148 .as_ref()
1149 .map(|m| m.to_protobuf()),
1150 added_downstream_actor_id,
1151 removed_downstream_actor_id: reschedule
1152 .removed_actors
1153 .iter()
1154 .cloned()
1155 .collect(),
1156 },
1157 )
1158 .unwrap();
1159 }
1160 }
1161 }
1162 let dispatcher_update = dispatcher_update.into_values().collect();
1163
1164 let mut merge_update = HashMap::new();
1165 for (&fragment_id, reschedule) in reschedules {
1166 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1167 let downstream_actor_ids = fragment_actors
1169 .get(&downstream_fragment_id)
1170 .expect("should contain");
1171
1172 let downstream_removed_actors: HashSet<_> = reschedules
1176 .get(&downstream_fragment_id)
1177 .map(|downstream_reschedule| {
1178 downstream_reschedule
1179 .removed_actors
1180 .iter()
1181 .copied()
1182 .collect()
1183 })
1184 .unwrap_or_default();
1185
1186 for &actor_id in downstream_actor_ids {
1188 if downstream_removed_actors.contains(&actor_id) {
1189 continue;
1190 }
1191
1192 merge_update
1194 .try_insert(
1195 (actor_id, fragment_id),
1196 MergeUpdate {
1197 actor_id,
1198 upstream_fragment_id: fragment_id,
1199 new_upstream_fragment_id: None,
1200 added_upstream_actors: reschedule
1201 .added_actors
1202 .iter()
1203 .flat_map(|(worker_id, actors)| {
1204 let host =
1205 control_stream_manager.host_addr(*worker_id);
1206 actors.iter().map(move |actor_id| PbActorInfo {
1207 actor_id: *actor_id,
1208 host: Some(host.clone()),
1209 })
1210 })
1211 .collect(),
1212 removed_upstream_actor_id: reschedule
1213 .removed_actors
1214 .iter()
1215 .cloned()
1216 .collect(),
1217 },
1218 )
1219 .unwrap();
1220 }
1221 }
1222 }
1223 let merge_update = merge_update.into_values().collect();
1224
1225 let mut actor_vnode_bitmap_update = HashMap::new();
1226 for reschedule in reschedules.values() {
1227 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1229 let bitmap = bitmap.to_protobuf();
1230 actor_vnode_bitmap_update
1231 .try_insert(actor_id, bitmap)
1232 .unwrap();
1233 }
1234 }
1235 let dropped_actors = reschedules
1236 .values()
1237 .flat_map(|r| r.removed_actors.iter().copied())
1238 .collect();
1239 let mut actor_splits = HashMap::new();
1240 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1241 let mut cdc_table_ids: HashSet<_> = HashSet::default();
1242 for reschedule in reschedules.values() {
1243 for (actor_id, splits) in &reschedule.actor_splits {
1244 actor_splits.insert(
1245 *actor_id as ActorId,
1246 ConnectorSplits {
1247 splits: splits.iter().map(ConnectorSplit::from).collect(),
1248 },
1249 );
1250 }
1251 actor_cdc_table_snapshot_splits.extend(
1252 build_pb_actor_cdc_table_snapshot_splits(
1253 reschedule.cdc_table_snapshot_split_assignment.clone(),
1254 ),
1255 );
1256 if let Some(cdc_table_id) = reschedule.cdc_table_id {
1257 cdc_table_ids.insert(cdc_table_id);
1258 }
1259 }
1260
1261 let actor_new_dispatchers = HashMap::new();
1263 let actor_cdc_table_snapshot_splits = if actor_cdc_table_snapshot_splits.is_empty()
1264 {
1265 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1266 CdcTableSnapshotSplitAssignmentWithGeneration::empty(),
1267 )
1268 .into()
1269 } else {
1270 PbCdcTableSnapshotSplitsWithGeneration {
1271 splits: actor_cdc_table_snapshot_splits,
1272 generation: control_stream_manager
1273 .env
1274 .cdc_table_backfill_tracker
1275 .next_generation(cdc_table_ids.into_iter()),
1276 }
1277 .into()
1278 };
1279 let mutation = Mutation::Update(UpdateMutation {
1280 dispatcher_update,
1281 merge_update,
1282 actor_vnode_bitmap_update,
1283 dropped_actors,
1284 actor_splits,
1285 actor_new_dispatchers,
1286 actor_cdc_table_snapshot_splits,
1287 sink_add_columns: Default::default(),
1288 });
1289 tracing::debug!("update mutation: {mutation:?}");
1290 Some(mutation)
1291 }
1292
1293 Command::CreateSubscription {
1294 upstream_mv_table_id,
1295 subscription_id,
1296 ..
1297 } => Some(Mutation::Add(AddMutation {
1298 actor_dispatchers: Default::default(),
1299 added_actors: vec![],
1300 actor_splits: Default::default(),
1301 pause: false,
1302 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1303 upstream_mv_table_id: upstream_mv_table_id.table_id,
1304 subscriber_id: *subscription_id,
1305 }],
1306 backfill_nodes_to_pause: vec![],
1307 actor_cdc_table_snapshot_splits: Default::default(),
1308 new_upstream_sinks: Default::default(),
1309 })),
1310 Command::DropSubscription {
1311 upstream_mv_table_id,
1312 subscription_id,
1313 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1314 info: vec![SubscriptionUpstreamInfo {
1315 subscriber_id: *subscription_id,
1316 upstream_mv_table_id: upstream_mv_table_id.table_id,
1317 }],
1318 })),
1319 Command::ConnectorPropsChange(config) => {
1320 let mut connector_props_infos = HashMap::default();
1321 for (k, v) in config {
1322 connector_props_infos.insert(
1323 *k,
1324 ConnectorPropsInfo {
1325 connector_props_info: v.clone(),
1326 },
1327 );
1328 }
1329 Some(Mutation::ConnectorPropsChange(
1330 ConnectorPropsChangeMutation {
1331 connector_props_infos,
1332 },
1333 ))
1334 }
1335 Command::StartFragmentBackfill { fragment_ids } => Some(
1336 Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1337 fragment_ids: fragment_ids.clone(),
1338 }),
1339 ),
1340 Command::Refresh {
1341 table_id,
1342 associated_source_id,
1343 } => Some(Mutation::RefreshStart(
1344 risingwave_pb::stream_plan::RefreshStartMutation {
1345 table_id: table_id.table_id,
1346 associated_source_id: associated_source_id.table_id,
1347 },
1348 )),
1349 Command::ListFinish {
1350 table_id: _,
1351 associated_source_id,
1352 } => Some(Mutation::ListFinish(ListFinishMutation {
1353 associated_source_id: associated_source_id.table_id,
1354 })),
1355 Command::LoadFinish {
1356 table_id: _,
1357 associated_source_id,
1358 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1359 associated_source_id: associated_source_id.table_id,
1360 })),
1361 }
1362 }
1363
1364 pub(super) fn actors_to_create(
1365 &self,
1366 graph_info: &InflightDatabaseInfo,
1367 edges: &mut Option<FragmentEdgeBuildResult>,
1368 control_stream_manager: &ControlStreamManager,
1369 ) -> Option<StreamJobActorsToCreate> {
1370 match self {
1371 Command::CreateStreamingJob { info, job_type, .. } => {
1372 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1373 return None;
1375 }
1376 let actors_to_create = info.stream_job_fragments.actors_to_create();
1377 let edges = edges.as_mut().expect("should exist");
1378 Some(edges.collect_actors_to_create(actors_to_create))
1379 }
1380 Command::RescheduleFragment {
1381 reschedules,
1382 fragment_actors,
1383 ..
1384 } => {
1385 let mut actor_upstreams = Self::collect_actor_upstreams(
1386 reschedules.iter().map(|(fragment_id, reschedule)| {
1387 (
1388 *fragment_id,
1389 reschedule.newly_created_actors.values().map(
1390 |((actor, dispatchers), _)| {
1391 (actor.actor_id, dispatchers.as_slice())
1392 },
1393 ),
1394 )
1395 }),
1396 Some((reschedules, fragment_actors)),
1397 graph_info,
1398 control_stream_manager,
1399 );
1400 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1401 for (fragment_id, (actor, dispatchers), worker_id) in
1402 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1403 reschedule
1404 .newly_created_actors
1405 .values()
1406 .map(|(actors, status)| (*fragment_id, actors, status))
1407 })
1408 {
1409 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1410 map.entry(*worker_id)
1411 .or_default()
1412 .entry(fragment_id)
1413 .or_insert_with(|| {
1414 let node = graph_info.fragment(fragment_id).nodes.clone();
1415 (node, vec![])
1416 })
1417 .1
1418 .push((actor.clone(), upstreams, dispatchers.clone()));
1419 }
1420 Some(map)
1421 }
1422 Command::ReplaceStreamJob(replace_table) => {
1423 let edges = edges.as_mut().expect("should exist");
1424 let mut actors =
1425 edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create());
1426 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1427 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1428 (
1429 sink.new_fragment.fragment_id,
1430 &sink.new_fragment.nodes,
1431 sink.new_fragment.actors.iter().map(|actor| {
1432 (
1433 actor,
1434 sink.actor_status[&actor.actor_id]
1435 .location
1436 .as_ref()
1437 .unwrap()
1438 .worker_node_id as _,
1439 )
1440 }),
1441 )
1442 }));
1443 for (worker_id, fragment_actors) in sink_actors {
1444 actors.entry(worker_id).or_default().extend(fragment_actors);
1445 }
1446 }
1447 Some(actors)
1448 }
1449 _ => None,
1450 }
1451 }
1452
1453 fn generate_update_mutation_for_replace_table(
1454 dropped_actors: impl IntoIterator<Item = ActorId>,
1455 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1456 dispatchers: FragmentActorDispatchers,
1457 init_split_assignment: &SplitAssignment,
1458 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
1459 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1460 ) -> Option<Mutation> {
1461 let dropped_actors = dropped_actors.into_iter().collect();
1462
1463 let actor_new_dispatchers = dispatchers
1464 .into_values()
1465 .flatten()
1466 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1467 .collect();
1468
1469 let actor_splits = init_split_assignment
1470 .values()
1471 .flat_map(build_actor_connector_splits)
1472 .collect();
1473 Some(Mutation::Update(UpdateMutation {
1474 actor_new_dispatchers,
1475 merge_update: merge_updates.into_values().flatten().collect(),
1476 dropped_actors,
1477 actor_splits,
1478 actor_cdc_table_snapshot_splits:
1479 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1480 cdc_table_snapshot_split_assignment,
1481 )
1482 .into(),
1483 sink_add_columns: auto_refresh_schema_sinks
1484 .as_ref()
1485 .into_iter()
1486 .flat_map(|sinks| {
1487 sinks.iter().map(|sink| {
1488 (
1489 sink.original_sink.id,
1490 PbSinkAddColumns {
1491 fields: sink
1492 .newly_add_fields
1493 .iter()
1494 .map(|field| field.to_prost())
1495 .collect(),
1496 },
1497 )
1498 })
1499 })
1500 .collect(),
1501 ..Default::default()
1502 }))
1503 }
1504
1505 pub fn jobs_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1507 match self {
1508 Command::DropStreamingJobs {
1509 streaming_job_ids, ..
1510 } => Some(streaming_job_ids.iter().cloned()),
1511 _ => None,
1512 }
1513 .into_iter()
1514 .flatten()
1515 }
1516}
1517
1518impl Command {
1519 #[expect(clippy::type_complexity)]
1520 pub(super) fn collect_actor_upstreams(
1521 actor_dispatchers: impl Iterator<
1522 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1523 >,
1524 reschedule_dispatcher_update: Option<(
1525 &HashMap<FragmentId, Reschedule>,
1526 &HashMap<FragmentId, HashSet<ActorId>>,
1527 )>,
1528 graph_info: &InflightDatabaseInfo,
1529 control_stream_manager: &ControlStreamManager,
1530 ) -> HashMap<ActorId, ActorUpstreams> {
1531 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1532 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1533 let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1534 for (upstream_actor_id, dispatchers) in upstream_actors {
1535 let upstream_actor_location =
1536 upstream_fragment.actors[&upstream_actor_id].worker_id;
1537 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1538 for downstream_actor_id in dispatchers
1539 .iter()
1540 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1541 {
1542 actor_upstreams
1543 .entry(*downstream_actor_id)
1544 .or_default()
1545 .entry(upstream_fragment_id)
1546 .or_default()
1547 .insert(
1548 upstream_actor_id,
1549 PbActorInfo {
1550 actor_id: upstream_actor_id,
1551 host: Some(upstream_actor_host.clone()),
1552 },
1553 );
1554 }
1555 }
1556 }
1557 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1558 for reschedule in reschedules.values() {
1559 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1560 let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1561 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1562 for upstream_actor_id in fragment_actors
1563 .get(upstream_fragment_id)
1564 .expect("should exist")
1565 {
1566 let upstream_actor_location =
1567 upstream_fragment.actors[upstream_actor_id].worker_id;
1568 let upstream_actor_host =
1569 control_stream_manager.host_addr(upstream_actor_location);
1570 if let Some(upstream_reschedule) = upstream_reschedule
1571 && upstream_reschedule
1572 .removed_actors
1573 .contains(upstream_actor_id)
1574 {
1575 continue;
1576 }
1577 for (_, downstream_actor_id) in
1578 reschedule
1579 .added_actors
1580 .iter()
1581 .flat_map(|(worker_id, actors)| {
1582 actors.iter().map(|actor| (*worker_id, *actor))
1583 })
1584 {
1585 actor_upstreams
1586 .entry(downstream_actor_id)
1587 .or_default()
1588 .entry(*upstream_fragment_id)
1589 .or_default()
1590 .insert(
1591 *upstream_actor_id,
1592 PbActorInfo {
1593 actor_id: *upstream_actor_id,
1594 host: Some(upstream_actor_host.clone()),
1595 },
1596 );
1597 }
1598 }
1599 }
1600 }
1601 }
1602 actor_upstreams
1603 }
1604}