1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
29use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_meta_model::WorkerId;
32use risingwave_pb::catalog::CreateType;
33use risingwave_pb::catalog::table::PbTableType;
34use risingwave_pb::common::PbActorInfo;
35use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
36use risingwave_pb::plan_common::PbField;
37use risingwave_pb::source::{
38 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
39};
40use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
41use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
42use risingwave_pb::stream_plan::barrier_mutation::Mutation;
43use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
44use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
45use risingwave_pb::stream_plan::stream_node::NodeBody;
46use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
47use risingwave_pb::stream_plan::update_mutation::*;
48use risingwave_pb::stream_plan::{
49 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
50 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
51 PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StopMutation,
52 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
53};
54use risingwave_pb::stream_service::BarrierCompleteResponse;
55use tracing::warn;
56
57use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
58use crate::MetaResult;
59use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
60use crate::barrier::cdc_progress::CdcTableBackfillTracker;
61use crate::barrier::edge_builder::FragmentEdgeBuildResult;
62use crate::barrier::info::BarrierInfo;
63use crate::barrier::rpc::ControlStreamManager;
64use crate::barrier::utils::collect_resp_info;
65use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
71 StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
74use crate::stream::{
75 AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
76 SplitState, UpstreamSinkInfo, build_actor_connector_splits,
77};
78
79#[derive(Debug, Clone)]
82pub struct Reschedule {
83 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
85
86 pub removed_actors: HashSet<ActorId>,
88
89 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
91
92 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
94 pub upstream_dispatcher_mapping: Option<ActorMapping>,
99
100 pub downstream_fragment_ids: Vec<FragmentId>,
102
103 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
107
108 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
109}
110
111#[derive(Debug, Clone)]
118pub struct ReplaceStreamJobPlan {
119 pub old_fragments: StreamJobFragments,
120 pub new_fragments: StreamJobFragmentsToCreate,
121 pub replace_upstream: FragmentReplaceUpstream,
124 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
125 pub init_split_assignment: SplitAssignment,
131 pub streaming_job: StreamingJob,
133 pub tmp_id: JobId,
135 pub to_drop_state_table_ids: Vec<TableId>,
137 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
138}
139
140impl ReplaceStreamJobPlan {
141 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
142 let mut fragment_changes = HashMap::new();
143 for (fragment_id, new_fragment) in self
144 .new_fragments
145 .new_fragment_info(&self.init_split_assignment)
146 {
147 let fragment_change = CommandFragmentChanges::NewFragment {
148 job_id: self.streaming_job.id(),
149 info: new_fragment,
150 };
151 fragment_changes
152 .try_insert(fragment_id, fragment_change)
153 .expect("non-duplicate");
154 }
155 for fragment in self.old_fragments.fragments.values() {
156 fragment_changes
157 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
158 .expect("non-duplicate");
159 }
160 for (fragment_id, replace_map) in &self.replace_upstream {
161 fragment_changes
162 .try_insert(
163 *fragment_id,
164 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
165 )
166 .expect("non-duplicate");
167 }
168 if let Some(sinks) = &self.auto_refresh_schema_sinks {
169 for sink in sinks {
170 let fragment_change = CommandFragmentChanges::NewFragment {
171 job_id: sink.original_sink.id.as_job_id(),
172 info: sink.new_fragment_info(),
173 };
174 fragment_changes
175 .try_insert(sink.new_fragment.fragment_id, fragment_change)
176 .expect("non-duplicate");
177 fragment_changes
178 .try_insert(
179 sink.original_fragment.fragment_id,
180 CommandFragmentChanges::RemoveFragment,
181 )
182 .expect("non-duplicate");
183 }
184 }
185 fragment_changes
186 }
187
188 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
190 let mut fragment_replacements = HashMap::new();
191 for (upstream_fragment_id, new_upstream_fragment_id) in
192 self.replace_upstream.values().flatten()
193 {
194 {
195 let r =
196 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
197 if let Some(r) = r {
198 assert_eq!(
199 *new_upstream_fragment_id, r,
200 "one fragment is replaced by multiple fragments"
201 );
202 }
203 }
204 }
205 fragment_replacements
206 }
207}
208
209#[derive(educe::Educe, Clone)]
210#[educe(Debug)]
211pub struct CreateStreamingJobCommandInfo {
212 #[educe(Debug(ignore))]
213 pub stream_job_fragments: StreamJobFragmentsToCreate,
214 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
215 pub init_split_assignment: SplitAssignment,
216 pub definition: String,
217 pub job_type: StreamingJobType,
218 pub create_type: CreateType,
219 pub streaming_job: StreamingJob,
220 pub fragment_backfill_ordering: FragmentBackfillOrder,
221 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
222 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
223}
224
225impl StreamJobFragments {
226 pub(super) fn new_fragment_info<'a>(
227 &'a self,
228 assignment: &'a SplitAssignment,
229 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
230 self.fragments.values().map(|fragment| {
231 let mut fragment_splits = assignment
232 .get(&fragment.fragment_id)
233 .cloned()
234 .unwrap_or_default();
235
236 (
237 fragment.fragment_id,
238 InflightFragmentInfo {
239 fragment_id: fragment.fragment_id,
240 distribution_type: fragment.distribution_type.into(),
241 fragment_type_mask: fragment.fragment_type_mask,
242 vnode_count: fragment.vnode_count(),
243 nodes: fragment.nodes.clone(),
244 actors: fragment
245 .actors
246 .iter()
247 .map(|actor| {
248 (
249 actor.actor_id,
250 InflightActorInfo {
251 worker_id: self
252 .actor_status
253 .get(&actor.actor_id)
254 .expect("should exist")
255 .worker_id(),
256 vnode_bitmap: actor.vnode_bitmap.clone(),
257 splits: fragment_splits
258 .remove(&actor.actor_id)
259 .unwrap_or_default(),
260 },
261 )
262 })
263 .collect(),
264 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
265 },
266 )
267 })
268 }
269}
270
271#[derive(Debug, Clone)]
272pub struct SnapshotBackfillInfo {
273 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
277}
278
279#[derive(Debug, Clone)]
280pub enum CreateStreamingJobType {
281 Normal,
282 SinkIntoTable(UpstreamSinkInfo),
283 SnapshotBackfill(SnapshotBackfillInfo),
284}
285
286#[derive(Debug)]
291pub enum Command {
292 Flush,
295
296 Pause,
299
300 Resume,
304
305 DropStreamingJobs {
313 streaming_job_ids: HashSet<JobId>,
314 actors: Vec<ActorId>,
315 unregistered_state_table_ids: HashSet<TableId>,
316 unregistered_fragment_ids: HashSet<FragmentId>,
317 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
319 },
320
321 CreateStreamingJob {
331 info: CreateStreamingJobCommandInfo,
332 job_type: CreateStreamingJobType,
333 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
334 },
335
336 RescheduleFragment {
342 reschedules: HashMap<FragmentId, Reschedule>,
343 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
345 },
346
347 ReplaceStreamJob(ReplaceStreamJobPlan),
354
355 SourceChangeSplit(SplitState),
358
359 Throttle {
362 jobs: HashSet<JobId>,
363 config: HashMap<FragmentId, Option<u32>>,
364 },
365
366 CreateSubscription {
369 subscription_id: SubscriptionId,
370 upstream_mv_table_id: TableId,
371 retention_second: u64,
372 },
373
374 DropSubscription {
378 subscription_id: SubscriptionId,
379 upstream_mv_table_id: TableId,
380 },
381
382 ConnectorPropsChange(ConnectorPropsChange),
383
384 Refresh {
387 table_id: TableId,
388 associated_source_id: SourceId,
389 },
390 ListFinish {
391 table_id: TableId,
392 associated_source_id: SourceId,
393 },
394 LoadFinish {
395 table_id: TableId,
396 associated_source_id: SourceId,
397 },
398}
399
400impl std::fmt::Display for Command {
402 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
403 match self {
404 Command::Flush => write!(f, "Flush"),
405 Command::Pause => write!(f, "Pause"),
406 Command::Resume => write!(f, "Resume"),
407 Command::DropStreamingJobs {
408 streaming_job_ids, ..
409 } => {
410 write!(
411 f,
412 "DropStreamingJobs: {}",
413 streaming_job_ids.iter().sorted().join(", ")
414 )
415 }
416 Command::CreateStreamingJob { info, .. } => {
417 write!(f, "CreateStreamingJob: {}", info.streaming_job)
418 }
419 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
420 Command::ReplaceStreamJob(plan) => {
421 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
422 }
423 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
424 Command::Throttle { .. } => write!(f, "Throttle"),
425 Command::CreateSubscription {
426 subscription_id, ..
427 } => write!(f, "CreateSubscription: {subscription_id}"),
428 Command::DropSubscription {
429 subscription_id, ..
430 } => write!(f, "DropSubscription: {subscription_id}"),
431 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
432 Command::Refresh {
433 table_id,
434 associated_source_id,
435 } => write!(
436 f,
437 "Refresh: {} (source: {})",
438 table_id, associated_source_id
439 ),
440 Command::ListFinish {
441 table_id,
442 associated_source_id,
443 } => write!(
444 f,
445 "ListFinish: {} (source: {})",
446 table_id, associated_source_id
447 ),
448 Command::LoadFinish {
449 table_id,
450 associated_source_id,
451 } => write!(
452 f,
453 "LoadFinish: {} (source: {})",
454 table_id, associated_source_id
455 ),
456 }
457 }
458}
459
460impl Command {
461 pub fn pause() -> Self {
462 Self::Pause
463 }
464
465 pub fn resume() -> Self {
466 Self::Resume
467 }
468
469 #[expect(clippy::type_complexity)]
470 pub(super) fn fragment_changes(
471 &self,
472 ) -> Option<(
473 Option<(JobId, Option<CdcTableBackfillTracker>)>,
474 HashMap<FragmentId, CommandFragmentChanges>,
475 )> {
476 match self {
477 Command::Flush => None,
478 Command::Pause => None,
479 Command::Resume => None,
480 Command::DropStreamingJobs {
481 unregistered_fragment_ids,
482 dropped_sink_fragment_by_targets,
483 ..
484 } => {
485 let changes = unregistered_fragment_ids
486 .iter()
487 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
488 .chain(dropped_sink_fragment_by_targets.iter().map(
489 |(target_fragment, sink_fragments)| {
490 (
491 *target_fragment,
492 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
493 )
494 },
495 ))
496 .collect();
497
498 Some((None, changes))
499 }
500 Command::CreateStreamingJob { info, job_type, .. } => {
501 assert!(
502 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
503 "should handle fragment changes separately for snapshot backfill"
504 );
505 let mut changes: HashMap<_, _> = info
506 .stream_job_fragments
507 .new_fragment_info(&info.init_split_assignment)
508 .map(|(fragment_id, fragment_infos)| {
509 (
510 fragment_id,
511 CommandFragmentChanges::NewFragment {
512 job_id: info.streaming_job.id(),
513 info: fragment_infos,
514 },
515 )
516 })
517 .collect();
518
519 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
520 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
521 changes.insert(
522 downstream_fragment_id,
523 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
524 upstream_fragment_id: ctx.sink_fragment_id,
525 sink_output_schema: ctx.sink_output_fields.clone(),
526 project_exprs: ctx.project_exprs.clone(),
527 }),
528 );
529 }
530
531 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
532 let (fragment, _) =
533 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
534 .expect("should have parallel cdc fragment");
535 Some(CdcTableBackfillTracker::new(
536 fragment.fragment_id,
537 splits.clone(),
538 ))
539 } else {
540 None
541 };
542
543 Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
544 }
545 Command::RescheduleFragment { reschedules, .. } => Some((
546 None,
547 reschedules
548 .iter()
549 .map(|(fragment_id, reschedule)| {
550 (
551 *fragment_id,
552 CommandFragmentChanges::Reschedule {
553 new_actors: reschedule
554 .added_actors
555 .iter()
556 .flat_map(|(node_id, actors)| {
557 actors.iter().map(|actor_id| {
558 (
559 *actor_id,
560 InflightActorInfo {
561 worker_id: *node_id,
562 vnode_bitmap: reschedule
563 .newly_created_actors
564 .get(actor_id)
565 .expect("should exist")
566 .0
567 .0
568 .vnode_bitmap
569 .clone(),
570 splits: reschedule
571 .actor_splits
572 .get(actor_id)
573 .cloned()
574 .unwrap_or_default(),
575 },
576 )
577 })
578 })
579 .collect(),
580 actor_update_vnode_bitmap: reschedule
581 .vnode_bitmap_updates
582 .iter()
583 .filter(|(actor_id, _)| {
584 !reschedule.newly_created_actors.contains_key(actor_id)
586 })
587 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
588 .collect(),
589 to_remove: reschedule.removed_actors.iter().cloned().collect(),
590 actor_splits: reschedule.actor_splits.clone(),
591 },
592 )
593 })
594 .collect(),
595 )),
596 Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
597 Command::SourceChangeSplit(SplitState {
598 split_assignment, ..
599 }) => Some((
600 None,
601 split_assignment
602 .iter()
603 .map(|(&fragment_id, splits)| {
604 (
605 fragment_id,
606 CommandFragmentChanges::SplitAssignment {
607 actor_splits: splits.clone(),
608 },
609 )
610 })
611 .collect(),
612 )),
613 Command::Throttle { .. } => None,
614 Command::CreateSubscription { .. } => None,
615 Command::DropSubscription { .. } => None,
616 Command::ConnectorPropsChange(_) => None,
617 Command::Refresh { .. } => None, Command::ListFinish { .. } => None, Command::LoadFinish { .. } => None, }
621 }
622
623 pub fn need_checkpoint(&self) -> bool {
624 !matches!(self, Command::Resume)
626 }
627}
628
629#[derive(Debug, Clone)]
630pub enum BarrierKind {
631 Initial,
632 Barrier,
633 Checkpoint(Vec<u64>),
635}
636
637impl BarrierKind {
638 pub fn to_protobuf(&self) -> PbBarrierKind {
639 match self {
640 BarrierKind::Initial => PbBarrierKind::Initial,
641 BarrierKind::Barrier => PbBarrierKind::Barrier,
642 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
643 }
644 }
645
646 pub fn is_checkpoint(&self) -> bool {
647 matches!(self, BarrierKind::Checkpoint(_))
648 }
649
650 pub fn is_initial(&self) -> bool {
651 matches!(self, BarrierKind::Initial)
652 }
653
654 pub fn as_str_name(&self) -> &'static str {
655 match self {
656 BarrierKind::Initial => "Initial",
657 BarrierKind::Barrier => "Barrier",
658 BarrierKind::Checkpoint(_) => "Checkpoint",
659 }
660 }
661}
662
663pub(super) struct CommandContext {
666 mv_subscription_max_retention: HashMap<TableId, u64>,
667
668 pub(super) barrier_info: BarrierInfo,
669
670 pub(super) table_ids_to_commit: HashSet<TableId>,
671
672 pub(super) command: Option<Command>,
673
674 _span: tracing::Span,
680}
681
682impl std::fmt::Debug for CommandContext {
683 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
684 f.debug_struct("CommandContext")
685 .field("barrier_info", &self.barrier_info)
686 .field("command", &self.command)
687 .finish()
688 }
689}
690
691impl std::fmt::Display for CommandContext {
692 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
693 write!(
694 f,
695 "prev_epoch={}, curr_epoch={}, kind={}",
696 self.barrier_info.prev_epoch(),
697 self.barrier_info.curr_epoch(),
698 self.barrier_info.kind.as_str_name()
699 )?;
700 if let Some(command) = &self.command {
701 write!(f, ", command={}", command)?;
702 }
703 Ok(())
704 }
705}
706
707impl CommandContext {
708 pub(super) fn new(
709 barrier_info: BarrierInfo,
710 mv_subscription_max_retention: HashMap<TableId, u64>,
711 table_ids_to_commit: HashSet<TableId>,
712 command: Option<Command>,
713 span: tracing::Span,
714 ) -> Self {
715 Self {
716 mv_subscription_max_retention,
717 barrier_info,
718 table_ids_to_commit,
719 command,
720 _span: span,
721 }
722 }
723
724 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
725 let Some(truncate_timestamptz) = Timestamptz::from_secs(
726 self.barrier_info
727 .prev_epoch
728 .value()
729 .as_timestamptz()
730 .timestamp()
731 - retention_second as i64,
732 ) else {
733 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
734 return self.barrier_info.prev_epoch.value();
735 };
736 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
737 }
738
739 pub(super) fn collect_commit_epoch_info(
740 &self,
741 info: &mut CommitEpochInfo,
742 resps: Vec<BarrierCompleteResponse>,
743 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
744 ) {
745 let (
746 sst_to_context,
747 synced_ssts,
748 new_table_watermarks,
749 old_value_ssts,
750 vector_index_adds,
751 truncate_tables,
752 ) = collect_resp_info(resps);
753
754 let new_table_fragment_infos =
755 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
756 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
757 {
758 let table_fragments = &info.stream_job_fragments;
759 let mut table_ids: HashSet<_> =
760 table_fragments.internal_table_ids().into_iter().collect();
761 if let Some(mv_table_id) = table_fragments.mv_table_id() {
762 table_ids.insert(mv_table_id);
763 }
764
765 vec![NewTableFragmentInfo { table_ids }]
766 } else {
767 vec![]
768 };
769
770 let mut mv_log_store_truncate_epoch = HashMap::new();
771 let mut update_truncate_epoch =
773 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
774 Entry::Occupied(mut entry) => {
775 let prev_truncate_epoch = entry.get_mut();
776 if truncate_epoch < *prev_truncate_epoch {
777 *prev_truncate_epoch = truncate_epoch;
778 }
779 }
780 Entry::Vacant(entry) => {
781 entry.insert(truncate_epoch);
782 }
783 };
784 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
785 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
786 update_truncate_epoch(*mv_table_id, truncate_epoch);
787 }
788 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
789 for mv_table_id in upstream_mv_table_ids {
790 update_truncate_epoch(mv_table_id, backfill_epoch);
791 }
792 }
793
794 let table_new_change_log = build_table_change_log_delta(
795 old_value_ssts.into_iter(),
796 synced_ssts.iter().map(|sst| &sst.sst_info),
797 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
798 mv_log_store_truncate_epoch.into_iter(),
799 );
800
801 let epoch = self.barrier_info.prev_epoch();
802 for table_id in &self.table_ids_to_commit {
803 info.tables_to_commit
804 .try_insert(*table_id, epoch)
805 .expect("non duplicate");
806 }
807
808 info.sstables.extend(synced_ssts);
809 info.new_table_watermarks.extend(new_table_watermarks);
810 info.sst_to_context.extend(sst_to_context);
811 info.new_table_fragment_infos
812 .extend(new_table_fragment_infos);
813 info.change_log_delta.extend(table_new_change_log);
814 for (table_id, vector_index_adds) in vector_index_adds {
815 info.vector_index_delta
816 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
817 .expect("non-duplicate");
818 }
819 if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
820 for fragment in job_info.stream_job_fragments.fragments.values() {
821 visit_stream_node_cont(&fragment.nodes, |node| {
822 match node.node_body.as_ref().unwrap() {
823 NodeBody::VectorIndexWrite(vector_index_write) => {
824 let index_table = vector_index_write.table.as_ref().unwrap();
825 assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
826 info.vector_index_delta
827 .try_insert(
828 index_table.id,
829 VectorIndexDelta::Init(PbVectorIndexInit {
830 info: Some(index_table.vector_index_info.unwrap()),
831 }),
832 )
833 .expect("non-duplicate");
834 false
835 }
836 _ => true,
837 }
838 })
839 }
840 }
841 info.truncate_tables.extend(truncate_tables);
842 }
843}
844
845impl Command {
846 pub(super) fn to_mutation(
850 &self,
851 is_currently_paused: bool,
852 edges: &mut Option<FragmentEdgeBuildResult>,
853 control_stream_manager: &ControlStreamManager,
854 database_info: &mut InflightDatabaseInfo,
855 ) -> MetaResult<Option<Mutation>> {
856 let mutation = match self {
857 Command::Flush => None,
858
859 Command::Pause => {
860 if !is_currently_paused {
863 Some(Mutation::Pause(PauseMutation {}))
864 } else {
865 None
866 }
867 }
868
869 Command::Resume => {
870 if is_currently_paused {
872 Some(Mutation::Resume(ResumeMutation {}))
873 } else {
874 None
875 }
876 }
877
878 Command::SourceChangeSplit(SplitState {
879 split_assignment, ..
880 }) => {
881 let mut diff = HashMap::new();
882
883 for actor_splits in split_assignment.values() {
884 diff.extend(actor_splits.clone());
885 }
886
887 Some(Mutation::Splits(SourceChangeSplitMutation {
888 actor_splits: build_actor_connector_splits(&diff),
889 }))
890 }
891
892 Command::Throttle { config, .. } => {
893 let mut fragment_to_apply = HashMap::new();
894 for (fragment_id, limit) in config {
895 fragment_to_apply.insert(*fragment_id, RateLimit { rate_limit: *limit });
896 }
897
898 Some(Mutation::Throttle(ThrottleMutation {
899 fragment_throttle: fragment_to_apply,
900 }))
901 }
902
903 Command::DropStreamingJobs {
904 actors,
905 dropped_sink_fragment_by_targets,
906 ..
907 } => Some(Mutation::Stop(StopMutation {
908 actors: actors.clone(),
909 dropped_sink_fragments: dropped_sink_fragment_by_targets
910 .values()
911 .flatten()
912 .cloned()
913 .collect(),
914 })),
915
916 Command::CreateStreamingJob {
917 info:
918 CreateStreamingJobCommandInfo {
919 stream_job_fragments,
920 init_split_assignment: split_assignment,
921 upstream_fragment_downstreams,
922 fragment_backfill_ordering,
923 ..
924 },
925 job_type,
926 ..
927 } => {
928 let edges = edges.as_mut().expect("should exist");
929 let added_actors = stream_job_fragments.actor_ids().collect();
930 let actor_splits = split_assignment
931 .values()
932 .flat_map(build_actor_connector_splits)
933 .collect();
934 let subscriptions_to_add =
935 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
936 job_type
937 {
938 snapshot_backfill_info
939 .upstream_mv_table_id_to_backfill_epoch
940 .keys()
941 .map(|table_id| SubscriptionUpstreamInfo {
942 subscriber_id: stream_job_fragments
943 .stream_job_id()
944 .as_subscriber_id(),
945 upstream_mv_table_id: *table_id,
946 })
947 .collect()
948 } else {
949 Default::default()
950 };
951 let backfill_nodes_to_pause: Vec<_> =
952 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
953 .into_iter()
954 .collect();
955
956 let new_upstream_sinks =
957 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
958 sink_fragment_id,
959 sink_output_fields,
960 project_exprs,
961 new_sink_downstream,
962 ..
963 }) = job_type
964 {
965 let new_sink_actors = stream_job_fragments
966 .actors_to_create()
967 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
968 .exactly_one()
969 .map(|(_, _, actors)| {
970 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
971 actor_id: actor.actor_id,
972 host: Some(control_stream_manager.host_addr(worker_id)),
973 })
974 })
975 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
976 let new_upstream_sink = PbNewUpstreamSink {
977 info: Some(PbUpstreamSinkInfo {
978 upstream_fragment_id: *sink_fragment_id,
979 sink_output_schema: sink_output_fields.clone(),
980 project_exprs: project_exprs.clone(),
981 }),
982 upstream_actors: new_sink_actors.collect(),
983 };
984 HashMap::from([(
985 new_sink_downstream.downstream_fragment_id,
986 new_upstream_sink,
987 )])
988 } else {
989 HashMap::new()
990 };
991
992 let actor_cdc_table_snapshot_splits =
993 if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
994 database_info
995 .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
996 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
997 } else {
998 None
999 };
1000
1001 let add_mutation = AddMutation {
1002 actor_dispatchers: edges
1003 .dispatchers
1004 .extract_if(|fragment_id, _| {
1005 upstream_fragment_downstreams.contains_key(fragment_id)
1006 })
1007 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1008 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1009 .collect(),
1010 added_actors,
1011 actor_splits,
1012 pause: is_currently_paused,
1014 subscriptions_to_add,
1015 backfill_nodes_to_pause,
1016 actor_cdc_table_snapshot_splits,
1017 new_upstream_sinks,
1018 };
1019
1020 Some(Mutation::Add(add_mutation))
1021 }
1022
1023 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1024 old_fragments,
1025 replace_upstream,
1026 upstream_fragment_downstreams,
1027 init_split_assignment,
1028 auto_refresh_schema_sinks,
1029 ..
1030 }) => {
1031 let edges = edges.as_mut().expect("should exist");
1032 let merge_updates = edges
1033 .merge_updates
1034 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1035 .collect();
1036 let dispatchers = edges
1037 .dispatchers
1038 .extract_if(|fragment_id, _| {
1039 upstream_fragment_downstreams.contains_key(fragment_id)
1040 })
1041 .collect();
1042 let actor_cdc_table_snapshot_splits = database_info
1043 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1044 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1045 Self::generate_update_mutation_for_replace_table(
1046 old_fragments.actor_ids().chain(
1047 auto_refresh_schema_sinks
1048 .as_ref()
1049 .into_iter()
1050 .flat_map(|sinks| {
1051 sinks.iter().flat_map(|sink| {
1052 sink.original_fragment
1053 .actors
1054 .iter()
1055 .map(|actor| actor.actor_id)
1056 })
1057 }),
1058 ),
1059 merge_updates,
1060 dispatchers,
1061 init_split_assignment,
1062 actor_cdc_table_snapshot_splits,
1063 auto_refresh_schema_sinks.as_ref(),
1064 )
1065 }
1066
1067 Command::RescheduleFragment {
1068 reschedules,
1069 fragment_actors,
1070 ..
1071 } => {
1072 let mut dispatcher_update = HashMap::new();
1073 for reschedule in reschedules.values() {
1074 for &(upstream_fragment_id, dispatcher_id) in
1075 &reschedule.upstream_fragment_dispatcher_ids
1076 {
1077 let upstream_actor_ids = fragment_actors
1079 .get(&upstream_fragment_id)
1080 .expect("should contain");
1081
1082 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1083
1084 for &actor_id in upstream_actor_ids {
1086 let added_downstream_actor_id = if upstream_reschedule
1087 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1088 .unwrap_or(true)
1089 {
1090 reschedule
1091 .added_actors
1092 .values()
1093 .flatten()
1094 .cloned()
1095 .collect()
1096 } else {
1097 Default::default()
1098 };
1099 dispatcher_update
1101 .try_insert(
1102 (actor_id, dispatcher_id),
1103 DispatcherUpdate {
1104 actor_id,
1105 dispatcher_id,
1106 hash_mapping: reschedule
1107 .upstream_dispatcher_mapping
1108 .as_ref()
1109 .map(|m| m.to_protobuf()),
1110 added_downstream_actor_id,
1111 removed_downstream_actor_id: reschedule
1112 .removed_actors
1113 .iter()
1114 .cloned()
1115 .collect(),
1116 },
1117 )
1118 .unwrap();
1119 }
1120 }
1121 }
1122 let dispatcher_update = dispatcher_update.into_values().collect();
1123
1124 let mut merge_update = HashMap::new();
1125 for (&fragment_id, reschedule) in reschedules {
1126 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1127 let downstream_actor_ids = fragment_actors
1129 .get(&downstream_fragment_id)
1130 .expect("should contain");
1131
1132 let downstream_removed_actors: HashSet<_> = reschedules
1136 .get(&downstream_fragment_id)
1137 .map(|downstream_reschedule| {
1138 downstream_reschedule
1139 .removed_actors
1140 .iter()
1141 .copied()
1142 .collect()
1143 })
1144 .unwrap_or_default();
1145
1146 for &actor_id in downstream_actor_ids {
1148 if downstream_removed_actors.contains(&actor_id) {
1149 continue;
1150 }
1151
1152 merge_update
1154 .try_insert(
1155 (actor_id, fragment_id),
1156 MergeUpdate {
1157 actor_id,
1158 upstream_fragment_id: fragment_id,
1159 new_upstream_fragment_id: None,
1160 added_upstream_actors: reschedule
1161 .added_actors
1162 .iter()
1163 .flat_map(|(worker_id, actors)| {
1164 let host =
1165 control_stream_manager.host_addr(*worker_id);
1166 actors.iter().map(move |&actor_id| PbActorInfo {
1167 actor_id,
1168 host: Some(host.clone()),
1169 })
1170 })
1171 .collect(),
1172 removed_upstream_actor_id: reschedule
1173 .removed_actors
1174 .iter()
1175 .cloned()
1176 .collect(),
1177 },
1178 )
1179 .unwrap();
1180 }
1181 }
1182 }
1183 let merge_update = merge_update.into_values().collect();
1184
1185 let mut actor_vnode_bitmap_update = HashMap::new();
1186 for reschedule in reschedules.values() {
1187 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1189 let bitmap = bitmap.to_protobuf();
1190 actor_vnode_bitmap_update
1191 .try_insert(actor_id, bitmap)
1192 .unwrap();
1193 }
1194 }
1195 let dropped_actors = reschedules
1196 .values()
1197 .flat_map(|r| r.removed_actors.iter().copied())
1198 .collect();
1199 let mut actor_splits = HashMap::new();
1200 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1201 for (fragment_id, reschedule) in reschedules {
1202 for (actor_id, splits) in &reschedule.actor_splits {
1203 actor_splits.insert(
1204 *actor_id,
1205 ConnectorSplits {
1206 splits: splits.iter().map(ConnectorSplit::from).collect(),
1207 },
1208 );
1209 }
1210
1211 if let Some(assignment) =
1212 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1213 {
1214 actor_cdc_table_snapshot_splits.extend(assignment)
1215 }
1216 }
1217
1218 let actor_new_dispatchers = HashMap::new();
1220 let mutation = Mutation::Update(UpdateMutation {
1221 dispatcher_update,
1222 merge_update,
1223 actor_vnode_bitmap_update,
1224 dropped_actors,
1225 actor_splits,
1226 actor_new_dispatchers,
1227 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1228 splits: actor_cdc_table_snapshot_splits,
1229 }),
1230 sink_schema_change: Default::default(),
1231 subscriptions_to_drop: vec![],
1232 });
1233 tracing::debug!("update mutation: {mutation:?}");
1234 Some(mutation)
1235 }
1236
1237 Command::CreateSubscription {
1238 upstream_mv_table_id,
1239 subscription_id,
1240 ..
1241 } => Some(Mutation::Add(AddMutation {
1242 actor_dispatchers: Default::default(),
1243 added_actors: vec![],
1244 actor_splits: Default::default(),
1245 pause: false,
1246 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1247 upstream_mv_table_id: *upstream_mv_table_id,
1248 subscriber_id: subscription_id.as_subscriber_id(),
1249 }],
1250 backfill_nodes_to_pause: vec![],
1251 actor_cdc_table_snapshot_splits: None,
1252 new_upstream_sinks: Default::default(),
1253 })),
1254 Command::DropSubscription {
1255 upstream_mv_table_id,
1256 subscription_id,
1257 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1258 info: vec![SubscriptionUpstreamInfo {
1259 subscriber_id: subscription_id.as_subscriber_id(),
1260 upstream_mv_table_id: *upstream_mv_table_id,
1261 }],
1262 })),
1263 Command::ConnectorPropsChange(config) => {
1264 let mut connector_props_infos = HashMap::default();
1265 for (k, v) in config {
1266 connector_props_infos.insert(
1267 k.as_raw_id(),
1268 ConnectorPropsInfo {
1269 connector_props_info: v.clone(),
1270 },
1271 );
1272 }
1273 Some(Mutation::ConnectorPropsChange(
1274 ConnectorPropsChangeMutation {
1275 connector_props_infos,
1276 },
1277 ))
1278 }
1279 Command::Refresh {
1280 table_id,
1281 associated_source_id,
1282 } => Some(Mutation::RefreshStart(
1283 risingwave_pb::stream_plan::RefreshStartMutation {
1284 table_id: *table_id,
1285 associated_source_id: *associated_source_id,
1286 },
1287 )),
1288 Command::ListFinish {
1289 table_id: _,
1290 associated_source_id,
1291 } => Some(Mutation::ListFinish(ListFinishMutation {
1292 associated_source_id: *associated_source_id,
1293 })),
1294 Command::LoadFinish {
1295 table_id: _,
1296 associated_source_id,
1297 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1298 associated_source_id: *associated_source_id,
1299 })),
1300 };
1301 Ok(mutation)
1302 }
1303
1304 pub(super) fn actors_to_create(
1305 &self,
1306 database_info: &InflightDatabaseInfo,
1307 edges: &mut Option<FragmentEdgeBuildResult>,
1308 control_stream_manager: &ControlStreamManager,
1309 ) -> Option<StreamJobActorsToCreate> {
1310 match self {
1311 Command::CreateStreamingJob { info, job_type, .. } => {
1312 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1313 return None;
1315 }
1316 let actors_to_create = info.stream_job_fragments.actors_to_create();
1317 let edges = edges.as_mut().expect("should exist");
1318 Some(edges.collect_actors_to_create(actors_to_create.map(
1319 |(fragment_id, node, actors)| {
1320 (
1321 fragment_id,
1322 node,
1323 actors,
1324 [], )
1326 },
1327 )))
1328 }
1329 Command::RescheduleFragment {
1330 reschedules,
1331 fragment_actors,
1332 ..
1333 } => {
1334 let mut actor_upstreams = Self::collect_actor_upstreams(
1335 reschedules.iter().map(|(fragment_id, reschedule)| {
1336 (
1337 *fragment_id,
1338 reschedule.newly_created_actors.values().map(
1339 |((actor, dispatchers), _)| {
1340 (actor.actor_id, dispatchers.as_slice())
1341 },
1342 ),
1343 )
1344 }),
1345 Some((reschedules, fragment_actors)),
1346 database_info,
1347 control_stream_manager,
1348 );
1349 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1350 for (fragment_id, (actor, dispatchers), worker_id) in
1351 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1352 reschedule
1353 .newly_created_actors
1354 .values()
1355 .map(|(actors, status)| (*fragment_id, actors, status))
1356 })
1357 {
1358 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1359 map.entry(*worker_id)
1360 .or_default()
1361 .entry(fragment_id)
1362 .or_insert_with(|| {
1363 let node = database_info.fragment(fragment_id).nodes.clone();
1364 let subscribers =
1365 database_info.fragment_subscribers(fragment_id).collect();
1366 (node, vec![], subscribers)
1367 })
1368 .1
1369 .push((actor.clone(), upstreams, dispatchers.clone()));
1370 }
1371 Some(map)
1372 }
1373 Command::ReplaceStreamJob(replace_table) => {
1374 let edges = edges.as_mut().expect("should exist");
1375 let mut actors = edges.collect_actors_to_create(
1376 replace_table.new_fragments.actors_to_create().map(
1377 |(fragment_id, node, actors)| {
1378 (
1379 fragment_id,
1380 node,
1381 actors,
1382 database_info
1383 .job_subscribers(replace_table.old_fragments.stream_job_id),
1384 )
1385 },
1386 ),
1387 );
1388 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1389 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1390 (
1391 sink.new_fragment.fragment_id,
1392 &sink.new_fragment.nodes,
1393 sink.new_fragment.actors.iter().map(|actor| {
1394 (
1395 actor,
1396 sink.actor_status[&actor.actor_id]
1397 .location
1398 .as_ref()
1399 .unwrap()
1400 .worker_node_id,
1401 )
1402 }),
1403 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1404 )
1405 }));
1406 for (worker_id, fragment_actors) in sink_actors {
1407 actors.entry(worker_id).or_default().extend(fragment_actors);
1408 }
1409 }
1410 Some(actors)
1411 }
1412 _ => None,
1413 }
1414 }
1415
1416 fn generate_update_mutation_for_replace_table(
1417 dropped_actors: impl IntoIterator<Item = ActorId>,
1418 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1419 dispatchers: FragmentActorDispatchers,
1420 init_split_assignment: &SplitAssignment,
1421 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1422 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1423 ) -> Option<Mutation> {
1424 let dropped_actors = dropped_actors.into_iter().collect();
1425
1426 let actor_new_dispatchers = dispatchers
1427 .into_values()
1428 .flatten()
1429 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1430 .collect();
1431
1432 let actor_splits = init_split_assignment
1433 .values()
1434 .flat_map(build_actor_connector_splits)
1435 .collect();
1436 Some(Mutation::Update(UpdateMutation {
1437 actor_new_dispatchers,
1438 merge_update: merge_updates.into_values().flatten().collect(),
1439 dropped_actors,
1440 actor_splits,
1441 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1442 sink_schema_change: auto_refresh_schema_sinks
1443 .as_ref()
1444 .into_iter()
1445 .flat_map(|sinks| {
1446 sinks.iter().map(|sink| {
1447 (
1448 sink.original_sink.id.as_raw_id(),
1449 PbSinkSchemaChange {
1450 original_schema: sink
1451 .original_sink
1452 .columns
1453 .iter()
1454 .map(|col| PbField {
1455 data_type: Some(
1456 col.column_desc
1457 .as_ref()
1458 .unwrap()
1459 .column_type
1460 .as_ref()
1461 .unwrap()
1462 .clone(),
1463 ),
1464 name: col.column_desc.as_ref().unwrap().name.clone(),
1465 })
1466 .collect(),
1467 op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1468 fields: sink
1469 .newly_add_fields
1470 .iter()
1471 .map(|field| field.to_prost())
1472 .collect(),
1473 })),
1474 },
1475 )
1476 })
1477 })
1478 .collect(),
1479 ..Default::default()
1480 }))
1481 }
1482
1483 pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_ {
1485 match self {
1486 Command::DropStreamingJobs {
1487 streaming_job_ids, ..
1488 } => Some(streaming_job_ids.iter().cloned()),
1489 _ => None,
1490 }
1491 .into_iter()
1492 .flatten()
1493 }
1494}
1495
1496impl Command {
1497 #[expect(clippy::type_complexity)]
1498 pub(super) fn collect_actor_upstreams(
1499 actor_dispatchers: impl Iterator<
1500 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1501 >,
1502 reschedule_dispatcher_update: Option<(
1503 &HashMap<FragmentId, Reschedule>,
1504 &HashMap<FragmentId, HashSet<ActorId>>,
1505 )>,
1506 database_info: &InflightDatabaseInfo,
1507 control_stream_manager: &ControlStreamManager,
1508 ) -> HashMap<ActorId, ActorUpstreams> {
1509 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1510 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1511 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1512 for (upstream_actor_id, dispatchers) in upstream_actors {
1513 let upstream_actor_location =
1514 upstream_fragment.actors[&upstream_actor_id].worker_id;
1515 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1516 for downstream_actor_id in dispatchers
1517 .iter()
1518 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1519 {
1520 actor_upstreams
1521 .entry(*downstream_actor_id)
1522 .or_default()
1523 .entry(upstream_fragment_id)
1524 .or_default()
1525 .insert(
1526 upstream_actor_id,
1527 PbActorInfo {
1528 actor_id: upstream_actor_id,
1529 host: Some(upstream_actor_host.clone()),
1530 },
1531 );
1532 }
1533 }
1534 }
1535 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1536 for reschedule in reschedules.values() {
1537 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1538 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1539 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1540 for upstream_actor_id in fragment_actors
1541 .get(upstream_fragment_id)
1542 .expect("should exist")
1543 {
1544 let upstream_actor_location =
1545 upstream_fragment.actors[upstream_actor_id].worker_id;
1546 let upstream_actor_host =
1547 control_stream_manager.host_addr(upstream_actor_location);
1548 if let Some(upstream_reschedule) = upstream_reschedule
1549 && upstream_reschedule
1550 .removed_actors
1551 .contains(upstream_actor_id)
1552 {
1553 continue;
1554 }
1555 for (_, downstream_actor_id) in
1556 reschedule
1557 .added_actors
1558 .iter()
1559 .flat_map(|(worker_id, actors)| {
1560 actors.iter().map(|actor| (*worker_id, *actor))
1561 })
1562 {
1563 actor_upstreams
1564 .entry(downstream_actor_id)
1565 .or_default()
1566 .entry(*upstream_fragment_id)
1567 .or_default()
1568 .insert(
1569 *upstream_actor_id,
1570 PbActorInfo {
1571 actor_id: *upstream_actor_id,
1572 host: Some(upstream_actor_host.clone()),
1573 },
1574 );
1575 }
1576 }
1577 }
1578 }
1579 }
1580 actor_upstreams
1581 }
1582}