1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18use std::iter;
19
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::TableId;
23use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
24use risingwave_common::id::{JobId, SourceId};
25use risingwave_common::must_match;
26use risingwave_common::types::Timestamptz;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::{
31 CdcTableSnapshotSplitAssignment, CdcTableSnapshotSplitAssignmentWithGeneration,
32 build_pb_actor_cdc_table_snapshot_splits,
33 build_pb_actor_cdc_table_snapshot_splits_with_generation,
34};
35use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
36use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
37use risingwave_meta_model::WorkerId;
38use risingwave_pb::catalog::CreateType;
39use risingwave_pb::catalog::table::PbTableType;
40use risingwave_pb::common::PbActorInfo;
41use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
42use risingwave_pb::source::{
43 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
44};
45use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
46use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
47use risingwave_pb::stream_plan::barrier_mutation::Mutation;
48use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
49use risingwave_pb::stream_plan::stream_node::NodeBody;
50use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
51use risingwave_pb::stream_plan::update_mutation::*;
52use risingwave_pb::stream_plan::{
53 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
54 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumns, PbUpstreamSinkInfo,
55 ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
56 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
57};
58use risingwave_pb::stream_service::BarrierCompleteResponse;
59use tracing::warn;
60
61use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
62use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
63use crate::barrier::edge_builder::FragmentEdgeBuildResult;
64use crate::barrier::info::BarrierInfo;
65use crate::barrier::rpc::ControlStreamManager;
66use crate::barrier::utils::collect_resp_info;
67use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
68use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
69use crate::manager::{StreamingJob, StreamingJobType};
70use crate::model::{
71 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
72 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
73 StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
74};
75use crate::stream::{
76 AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
77 SplitState, ThrottleConfig, UpstreamSinkInfo, build_actor_connector_splits,
78};
79
80#[derive(Debug, Clone)]
83pub struct Reschedule {
84 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
86
87 pub removed_actors: HashSet<ActorId>,
89
90 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
92
93 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
95 pub upstream_dispatcher_mapping: Option<ActorMapping>,
100
101 pub downstream_fragment_ids: Vec<FragmentId>,
103
104 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
108
109 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
110
111 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
112
113 pub cdc_table_job_id: Option<JobId>,
114}
115
116#[derive(Debug, Clone)]
123pub struct ReplaceStreamJobPlan {
124 pub old_fragments: StreamJobFragments,
125 pub new_fragments: StreamJobFragmentsToCreate,
126 pub replace_upstream: FragmentReplaceUpstream,
129 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
130 pub init_split_assignment: SplitAssignment,
136 pub streaming_job: StreamingJob,
138 pub tmp_id: JobId,
140 pub to_drop_state_table_ids: Vec<TableId>,
142 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
143 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
144}
145
146impl ReplaceStreamJobPlan {
147 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
148 let mut fragment_changes = HashMap::new();
149 for (fragment_id, new_fragment) in self
150 .new_fragments
151 .new_fragment_info(&self.init_split_assignment)
152 {
153 let fragment_change = CommandFragmentChanges::NewFragment {
154 job_id: self.streaming_job.id(),
155 info: new_fragment,
156 is_existing: false,
157 };
158 fragment_changes
159 .try_insert(fragment_id, fragment_change)
160 .expect("non-duplicate");
161 }
162 for fragment in self.old_fragments.fragments.values() {
163 fragment_changes
164 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
165 .expect("non-duplicate");
166 }
167 for (fragment_id, replace_map) in &self.replace_upstream {
168 fragment_changes
169 .try_insert(
170 *fragment_id,
171 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
172 )
173 .expect("non-duplicate");
174 }
175 if let Some(sinks) = &self.auto_refresh_schema_sinks {
176 for sink in sinks {
177 let fragment_change = CommandFragmentChanges::NewFragment {
178 job_id: sink.original_sink.id.as_job_id(),
179 info: sink.new_fragment_info(),
180 is_existing: false,
181 };
182 fragment_changes
183 .try_insert(sink.new_fragment.fragment_id, fragment_change)
184 .expect("non-duplicate");
185 fragment_changes
186 .try_insert(
187 sink.original_fragment.fragment_id,
188 CommandFragmentChanges::RemoveFragment,
189 )
190 .expect("non-duplicate");
191 }
192 }
193 fragment_changes
194 }
195
196 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
198 let mut fragment_replacements = HashMap::new();
199 for (upstream_fragment_id, new_upstream_fragment_id) in
200 self.replace_upstream.values().flatten()
201 {
202 {
203 let r =
204 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
205 if let Some(r) = r {
206 assert_eq!(
207 *new_upstream_fragment_id, r,
208 "one fragment is replaced by multiple fragments"
209 );
210 }
211 }
212 }
213 fragment_replacements
214 }
215}
216
217#[derive(educe::Educe, Clone)]
218#[educe(Debug)]
219pub struct CreateStreamingJobCommandInfo {
220 #[educe(Debug(ignore))]
221 pub stream_job_fragments: StreamJobFragmentsToCreate,
222 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
223 pub init_split_assignment: SplitAssignment,
224 pub definition: String,
225 pub job_type: StreamingJobType,
226 pub create_type: CreateType,
227 pub streaming_job: StreamingJob,
228 pub fragment_backfill_ordering: FragmentBackfillOrder,
229 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
230 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
231}
232
233impl StreamJobFragments {
234 pub(super) fn new_fragment_info<'a>(
235 &'a self,
236 assignment: &'a SplitAssignment,
237 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
238 self.fragments.values().map(|fragment| {
239 let mut fragment_splits = assignment
240 .get(&fragment.fragment_id)
241 .cloned()
242 .unwrap_or_default();
243
244 (
245 fragment.fragment_id,
246 InflightFragmentInfo {
247 fragment_id: fragment.fragment_id,
248 distribution_type: fragment.distribution_type.into(),
249 fragment_type_mask: fragment.fragment_type_mask,
250 vnode_count: fragment.vnode_count(),
251 nodes: fragment.nodes.clone(),
252 actors: fragment
253 .actors
254 .iter()
255 .map(|actor| {
256 (
257 actor.actor_id,
258 InflightActorInfo {
259 worker_id: self
260 .actor_status
261 .get(&actor.actor_id)
262 .expect("should exist")
263 .worker_id(),
264 vnode_bitmap: actor.vnode_bitmap.clone(),
265 splits: fragment_splits
266 .remove(&actor.actor_id)
267 .unwrap_or_default(),
268 },
269 )
270 })
271 .collect(),
272 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
273 },
274 )
275 })
276 }
277}
278
279#[derive(Debug, Clone)]
280pub struct SnapshotBackfillInfo {
281 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
285}
286
287#[derive(Debug, Clone)]
288pub enum CreateStreamingJobType {
289 Normal,
290 SinkIntoTable(UpstreamSinkInfo),
291 SnapshotBackfill(SnapshotBackfillInfo),
292}
293
294#[derive(Debug)]
299pub enum Command {
300 Flush,
303
304 Pause,
307
308 Resume,
312
313 DropStreamingJobs {
321 streaming_job_ids: HashSet<JobId>,
322 actors: Vec<ActorId>,
323 unregistered_state_table_ids: HashSet<TableId>,
324 unregistered_fragment_ids: HashSet<FragmentId>,
325 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
327 },
328
329 CreateStreamingJob {
339 info: CreateStreamingJobCommandInfo,
340 job_type: CreateStreamingJobType,
341 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
342 },
343 MergeSnapshotBackfillStreamingJobs(
344 HashMap<JobId, (HashSet<TableId>, HashMap<FragmentId, InflightFragmentInfo>)>,
345 ),
346
347 RescheduleFragment {
353 reschedules: HashMap<FragmentId, Reschedule>,
354 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
356 },
357
358 ReplaceStreamJob(ReplaceStreamJobPlan),
365
366 SourceChangeSplit(SplitState),
369
370 Throttle(ThrottleConfig),
373
374 CreateSubscription {
377 subscription_id: SubscriptionId,
378 upstream_mv_table_id: TableId,
379 retention_second: u64,
380 },
381
382 DropSubscription {
386 subscription_id: SubscriptionId,
387 upstream_mv_table_id: TableId,
388 },
389
390 ConnectorPropsChange(ConnectorPropsChange),
391
392 StartFragmentBackfill {
394 fragment_ids: Vec<FragmentId>,
395 },
396
397 Refresh {
400 table_id: TableId,
401 associated_source_id: SourceId,
402 },
403 ListFinish {
404 table_id: TableId,
405 associated_source_id: SourceId,
406 },
407 LoadFinish {
408 table_id: TableId,
409 associated_source_id: SourceId,
410 },
411}
412
413impl std::fmt::Display for Command {
415 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
416 match self {
417 Command::Flush => write!(f, "Flush"),
418 Command::Pause => write!(f, "Pause"),
419 Command::Resume => write!(f, "Resume"),
420 Command::DropStreamingJobs {
421 streaming_job_ids, ..
422 } => {
423 write!(
424 f,
425 "DropStreamingJobs: {}",
426 streaming_job_ids.iter().sorted().join(", ")
427 )
428 }
429 Command::CreateStreamingJob { info, .. } => {
430 write!(f, "CreateStreamingJob: {}", info.streaming_job)
431 }
432 Command::MergeSnapshotBackfillStreamingJobs(_) => {
433 write!(f, "MergeSnapshotBackfillStreamingJobs")
434 }
435 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
436 Command::ReplaceStreamJob(plan) => {
437 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
438 }
439 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
440 Command::Throttle(_) => write!(f, "Throttle"),
441 Command::CreateSubscription {
442 subscription_id, ..
443 } => write!(f, "CreateSubscription: {subscription_id}"),
444 Command::DropSubscription {
445 subscription_id, ..
446 } => write!(f, "DropSubscription: {subscription_id}"),
447 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
448 Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
449 Command::Refresh {
450 table_id,
451 associated_source_id,
452 } => write!(
453 f,
454 "Refresh: {} (source: {})",
455 table_id, associated_source_id
456 ),
457 Command::ListFinish {
458 table_id,
459 associated_source_id,
460 } => write!(
461 f,
462 "ListFinish: {} (source: {})",
463 table_id, associated_source_id
464 ),
465 Command::LoadFinish {
466 table_id,
467 associated_source_id,
468 } => write!(
469 f,
470 "LoadFinish: {} (source: {})",
471 table_id, associated_source_id
472 ),
473 }
474 }
475}
476
477impl Command {
478 pub fn pause() -> Self {
479 Self::Pause
480 }
481
482 pub fn resume() -> Self {
483 Self::Resume
484 }
485
486 pub(crate) fn fragment_changes(
487 &self,
488 ) -> Option<(Option<JobId>, HashMap<FragmentId, CommandFragmentChanges>)> {
489 match self {
490 Command::Flush => None,
491 Command::Pause => None,
492 Command::Resume => None,
493 Command::DropStreamingJobs {
494 unregistered_fragment_ids,
495 dropped_sink_fragment_by_targets,
496 ..
497 } => {
498 let changes = unregistered_fragment_ids
499 .iter()
500 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
501 .chain(dropped_sink_fragment_by_targets.iter().map(
502 |(target_fragment, sink_fragments)| {
503 (
504 *target_fragment,
505 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
506 )
507 },
508 ))
509 .collect();
510
511 Some((None, changes))
512 }
513 Command::CreateStreamingJob { info, job_type, .. } => {
514 assert!(
515 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
516 "should handle fragment changes separately for snapshot backfill"
517 );
518 let mut changes: HashMap<_, _> = info
519 .stream_job_fragments
520 .new_fragment_info(&info.init_split_assignment)
521 .map(|(fragment_id, fragment_info)| {
522 (
523 fragment_id,
524 CommandFragmentChanges::NewFragment {
525 job_id: info.streaming_job.id(),
526 info: fragment_info,
527 is_existing: false,
528 },
529 )
530 })
531 .collect();
532
533 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
534 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
535 changes.insert(
536 downstream_fragment_id,
537 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
538 upstream_fragment_id: ctx.sink_fragment_id,
539 sink_output_schema: ctx.sink_output_fields.clone(),
540 project_exprs: ctx.project_exprs.clone(),
541 }),
542 );
543 }
544
545 Some((Some(info.streaming_job.id()), changes))
546 }
547 Command::RescheduleFragment { reschedules, .. } => Some((
548 None,
549 reschedules
550 .iter()
551 .map(|(fragment_id, reschedule)| {
552 (
553 *fragment_id,
554 CommandFragmentChanges::Reschedule {
555 new_actors: reschedule
556 .added_actors
557 .iter()
558 .flat_map(|(node_id, actors)| {
559 actors.iter().map(|actor_id| {
560 (
561 *actor_id,
562 InflightActorInfo {
563 worker_id: *node_id,
564 vnode_bitmap: reschedule
565 .newly_created_actors
566 .get(actor_id)
567 .expect("should exist")
568 .0
569 .0
570 .vnode_bitmap
571 .clone(),
572 splits: reschedule
573 .actor_splits
574 .get(actor_id)
575 .cloned()
576 .unwrap_or_default(),
577 },
578 )
579 })
580 })
581 .collect(),
582 actor_update_vnode_bitmap: reschedule
583 .vnode_bitmap_updates
584 .iter()
585 .filter(|(actor_id, _)| {
586 !reschedule.newly_created_actors.contains_key(actor_id)
588 })
589 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
590 .collect(),
591 to_remove: reschedule.removed_actors.iter().cloned().collect(),
592 actor_splits: reschedule.actor_splits.clone(),
593 },
594 )
595 })
596 .collect(),
597 )),
598 Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
599 Command::MergeSnapshotBackfillStreamingJobs(_) => None,
600 Command::SourceChangeSplit(SplitState {
601 split_assignment, ..
602 }) => Some((
603 None,
604 split_assignment
605 .iter()
606 .map(|(&fragment_id, splits)| {
607 (
608 fragment_id,
609 CommandFragmentChanges::SplitAssignment {
610 actor_splits: splits.clone(),
611 },
612 )
613 })
614 .collect(),
615 )),
616 Command::Throttle(_) => None,
617 Command::CreateSubscription { .. } => None,
618 Command::DropSubscription { .. } => None,
619 Command::ConnectorPropsChange(_) => None,
620 Command::StartFragmentBackfill { .. } => None,
621 Command::Refresh { .. } => None, Command::ListFinish { .. } => None, Command::LoadFinish { .. } => None, }
625 }
626
627 pub fn need_checkpoint(&self) -> bool {
628 !matches!(self, Command::Resume)
630 }
631}
632
633#[derive(Debug, Clone)]
634pub enum BarrierKind {
635 Initial,
636 Barrier,
637 Checkpoint(Vec<u64>),
639}
640
641impl BarrierKind {
642 pub fn to_protobuf(&self) -> PbBarrierKind {
643 match self {
644 BarrierKind::Initial => PbBarrierKind::Initial,
645 BarrierKind::Barrier => PbBarrierKind::Barrier,
646 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
647 }
648 }
649
650 pub fn is_checkpoint(&self) -> bool {
651 matches!(self, BarrierKind::Checkpoint(_))
652 }
653
654 pub fn is_initial(&self) -> bool {
655 matches!(self, BarrierKind::Initial)
656 }
657
658 pub fn as_str_name(&self) -> &'static str {
659 match self {
660 BarrierKind::Initial => "Initial",
661 BarrierKind::Barrier => "Barrier",
662 BarrierKind::Checkpoint(_) => "Checkpoint",
663 }
664 }
665}
666
667pub(super) struct CommandContext {
670 mv_subscription_max_retention: HashMap<TableId, u64>,
671
672 pub(super) barrier_info: BarrierInfo,
673
674 pub(super) table_ids_to_commit: HashSet<TableId>,
675
676 pub(super) command: Option<Command>,
677
678 _span: tracing::Span,
684}
685
686impl std::fmt::Debug for CommandContext {
687 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
688 f.debug_struct("CommandContext")
689 .field("barrier_info", &self.barrier_info)
690 .field("command", &self.command)
691 .finish()
692 }
693}
694
695impl std::fmt::Display for CommandContext {
696 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
697 write!(
698 f,
699 "prev_epoch={}, curr_epoch={}, kind={}",
700 self.barrier_info.prev_epoch.value().0,
701 self.barrier_info.curr_epoch.value().0,
702 self.barrier_info.kind.as_str_name()
703 )?;
704 if let Some(command) = &self.command {
705 write!(f, ", command={}", command)?;
706 }
707 Ok(())
708 }
709}
710
711impl CommandContext {
712 pub(super) fn new(
713 barrier_info: BarrierInfo,
714 mv_subscription_max_retention: HashMap<TableId, u64>,
715 table_ids_to_commit: HashSet<TableId>,
716 command: Option<Command>,
717 span: tracing::Span,
718 ) -> Self {
719 Self {
720 mv_subscription_max_retention,
721 barrier_info,
722 table_ids_to_commit,
723 command,
724 _span: span,
725 }
726 }
727
728 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
729 let Some(truncate_timestamptz) = Timestamptz::from_secs(
730 self.barrier_info
731 .prev_epoch
732 .value()
733 .as_timestamptz()
734 .timestamp()
735 - retention_second as i64,
736 ) else {
737 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
738 return self.barrier_info.prev_epoch.value();
739 };
740 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
741 }
742
743 pub(super) fn collect_commit_epoch_info(
744 &self,
745 info: &mut CommitEpochInfo,
746 resps: Vec<BarrierCompleteResponse>,
747 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
748 ) {
749 let (
750 sst_to_context,
751 synced_ssts,
752 new_table_watermarks,
753 old_value_ssts,
754 vector_index_adds,
755 truncate_tables,
756 ) = collect_resp_info(resps);
757
758 let new_table_fragment_infos =
759 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
760 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
761 {
762 let table_fragments = &info.stream_job_fragments;
763 let mut table_ids: HashSet<_> =
764 table_fragments.internal_table_ids().into_iter().collect();
765 if let Some(mv_table_id) = table_fragments.mv_table_id() {
766 table_ids.insert(mv_table_id);
767 }
768
769 vec![NewTableFragmentInfo { table_ids }]
770 } else {
771 vec![]
772 };
773
774 let mut mv_log_store_truncate_epoch = HashMap::new();
775 let mut update_truncate_epoch =
777 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
778 Entry::Occupied(mut entry) => {
779 let prev_truncate_epoch = entry.get_mut();
780 if truncate_epoch < *prev_truncate_epoch {
781 *prev_truncate_epoch = truncate_epoch;
782 }
783 }
784 Entry::Vacant(entry) => {
785 entry.insert(truncate_epoch);
786 }
787 };
788 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
789 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
790 update_truncate_epoch(*mv_table_id, truncate_epoch);
791 }
792 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
793 for mv_table_id in upstream_mv_table_ids {
794 update_truncate_epoch(mv_table_id, backfill_epoch);
795 }
796 }
797
798 let table_new_change_log = build_table_change_log_delta(
799 old_value_ssts.into_iter(),
800 synced_ssts.iter().map(|sst| &sst.sst_info),
801 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
802 mv_log_store_truncate_epoch.into_iter(),
803 );
804
805 let epoch = self.barrier_info.prev_epoch();
806 for table_id in &self.table_ids_to_commit {
807 info.tables_to_commit
808 .try_insert(*table_id, epoch)
809 .expect("non duplicate");
810 }
811
812 info.sstables.extend(synced_ssts);
813 info.new_table_watermarks.extend(new_table_watermarks);
814 info.sst_to_context.extend(sst_to_context);
815 info.new_table_fragment_infos
816 .extend(new_table_fragment_infos);
817 info.change_log_delta.extend(table_new_change_log);
818 for (table_id, vector_index_adds) in vector_index_adds {
819 info.vector_index_delta
820 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
821 .expect("non-duplicate");
822 }
823 if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
824 for fragment in job_info.stream_job_fragments.fragments.values() {
825 visit_stream_node_cont(&fragment.nodes, |node| {
826 match node.node_body.as_ref().unwrap() {
827 NodeBody::VectorIndexWrite(vector_index_write) => {
828 let index_table = vector_index_write.table.as_ref().unwrap();
829 assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
830 info.vector_index_delta
831 .try_insert(
832 index_table.id,
833 VectorIndexDelta::Init(PbVectorIndexInit {
834 info: Some(index_table.vector_index_info.unwrap()),
835 }),
836 )
837 .expect("non-duplicate");
838 false
839 }
840 _ => true,
841 }
842 })
843 }
844 }
845 info.truncate_tables.extend(truncate_tables);
846 }
847}
848
849impl Command {
850 pub(super) fn to_mutation(
854 &self,
855 is_currently_paused: bool,
856 edges: &mut Option<FragmentEdgeBuildResult>,
857 control_stream_manager: &ControlStreamManager,
858 ) -> Option<Mutation> {
859 match self {
860 Command::Flush => None,
861
862 Command::Pause => {
863 if !is_currently_paused {
866 Some(Mutation::Pause(PauseMutation {}))
867 } else {
868 None
869 }
870 }
871
872 Command::Resume => {
873 if is_currently_paused {
875 Some(Mutation::Resume(ResumeMutation {}))
876 } else {
877 None
878 }
879 }
880
881 Command::SourceChangeSplit(SplitState {
882 split_assignment, ..
883 }) => {
884 let mut diff = HashMap::new();
885
886 for actor_splits in split_assignment.values() {
887 diff.extend(actor_splits.clone());
888 }
889
890 Some(Mutation::Splits(SourceChangeSplitMutation {
891 actor_splits: build_actor_connector_splits(&diff),
892 }))
893 }
894
895 Command::Throttle(config) => {
896 let mut actor_to_apply = HashMap::new();
897 for per_fragment in config.values() {
898 actor_to_apply.extend(
899 per_fragment
900 .iter()
901 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
902 );
903 }
904
905 Some(Mutation::Throttle(ThrottleMutation {
906 actor_throttle: actor_to_apply,
907 }))
908 }
909
910 Command::DropStreamingJobs {
911 actors,
912 dropped_sink_fragment_by_targets,
913 ..
914 } => Some(Mutation::Stop(StopMutation {
915 actors: actors.clone(),
916 dropped_sink_fragments: dropped_sink_fragment_by_targets
917 .values()
918 .flatten()
919 .cloned()
920 .collect(),
921 })),
922
923 Command::CreateStreamingJob {
924 info:
925 CreateStreamingJobCommandInfo {
926 stream_job_fragments: table_fragments,
927 init_split_assignment: split_assignment,
928 upstream_fragment_downstreams,
929 fragment_backfill_ordering,
930 cdc_table_snapshot_split_assignment,
931 ..
932 },
933 job_type,
934 ..
935 } => {
936 let edges = edges.as_mut().expect("should exist");
937 let added_actors = table_fragments.actor_ids().collect();
938 let actor_splits = split_assignment
939 .values()
940 .flat_map(build_actor_connector_splits)
941 .collect();
942 let subscriptions_to_add =
943 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
944 job_type
945 {
946 snapshot_backfill_info
947 .upstream_mv_table_id_to_backfill_epoch
948 .keys()
949 .map(|table_id| SubscriptionUpstreamInfo {
950 subscriber_id: table_fragments.stream_job_id().as_raw_id(),
951 upstream_mv_table_id: *table_id,
952 })
953 .collect()
954 } else {
955 Default::default()
956 };
957 let backfill_nodes_to_pause: Vec<_> =
958 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
959 .into_iter()
960 .collect();
961
962 let new_upstream_sinks =
963 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
964 sink_fragment_id,
965 sink_output_fields,
966 project_exprs,
967 new_sink_downstream,
968 ..
969 }) = job_type
970 {
971 let new_sink_actors = table_fragments
972 .actors_to_create()
973 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
974 .exactly_one()
975 .map(|(_, _, actors)| {
976 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
977 actor_id: actor.actor_id,
978 host: Some(control_stream_manager.host_addr(worker_id)),
979 })
980 })
981 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
982 let new_upstream_sink = PbNewUpstreamSink {
983 info: Some(PbUpstreamSinkInfo {
984 upstream_fragment_id: *sink_fragment_id,
985 sink_output_schema: sink_output_fields.clone(),
986 project_exprs: project_exprs.clone(),
987 }),
988 upstream_actors: new_sink_actors.collect(),
989 };
990 HashMap::from([(
991 new_sink_downstream.downstream_fragment_id,
992 new_upstream_sink,
993 )])
994 } else {
995 HashMap::new()
996 };
997
998 let add_mutation = AddMutation {
999 actor_dispatchers: edges
1000 .dispatchers
1001 .extract_if(|fragment_id, _| {
1002 upstream_fragment_downstreams.contains_key(fragment_id)
1003 })
1004 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1005 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1006 .collect(),
1007 added_actors,
1008 actor_splits,
1009 pause: is_currently_paused,
1011 subscriptions_to_add,
1012 backfill_nodes_to_pause,
1013 actor_cdc_table_snapshot_splits:
1014 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1015 cdc_table_snapshot_split_assignment.clone(),
1016 )
1017 .into(),
1018 new_upstream_sinks,
1019 };
1020
1021 Some(Mutation::Add(add_mutation))
1022 }
1023 Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
1024 Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1025 info: jobs_to_merge
1026 .iter()
1027 .flat_map(|(table_id, (backfill_upstream_tables, _))| {
1028 backfill_upstream_tables
1029 .iter()
1030 .map(move |upstream_table_id| SubscriptionUpstreamInfo {
1031 subscriber_id: table_id.as_raw_id(),
1032 upstream_mv_table_id: *upstream_table_id,
1033 })
1034 })
1035 .collect(),
1036 }))
1037 }
1038
1039 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1040 old_fragments,
1041 replace_upstream,
1042 upstream_fragment_downstreams,
1043 init_split_assignment,
1044 auto_refresh_schema_sinks,
1045 cdc_table_snapshot_split_assignment,
1046 ..
1047 }) => {
1048 let edges = edges.as_mut().expect("should exist");
1049 let merge_updates = edges
1050 .merge_updates
1051 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1052 .collect();
1053 let dispatchers = edges
1054 .dispatchers
1055 .extract_if(|fragment_id, _| {
1056 upstream_fragment_downstreams.contains_key(fragment_id)
1057 })
1058 .collect();
1059 let cdc_table_snapshot_split_assignment =
1060 if cdc_table_snapshot_split_assignment.is_empty() {
1061 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
1062 } else {
1063 CdcTableSnapshotSplitAssignmentWithGeneration::new(
1064 cdc_table_snapshot_split_assignment.clone(),
1065 control_stream_manager
1066 .env
1067 .cdc_table_backfill_tracker
1068 .next_generation(iter::once(old_fragments.stream_job_id)),
1069 )
1070 };
1071 Self::generate_update_mutation_for_replace_table(
1072 old_fragments.actor_ids().chain(
1073 auto_refresh_schema_sinks
1074 .as_ref()
1075 .into_iter()
1076 .flat_map(|sinks| {
1077 sinks.iter().flat_map(|sink| {
1078 sink.original_fragment
1079 .actors
1080 .iter()
1081 .map(|actor| actor.actor_id)
1082 })
1083 }),
1084 ),
1085 merge_updates,
1086 dispatchers,
1087 init_split_assignment,
1088 cdc_table_snapshot_split_assignment,
1089 auto_refresh_schema_sinks.as_ref(),
1090 )
1091 }
1092
1093 Command::RescheduleFragment {
1094 reschedules,
1095 fragment_actors,
1096 ..
1097 } => {
1098 let mut dispatcher_update = HashMap::new();
1099 for reschedule in reschedules.values() {
1100 for &(upstream_fragment_id, dispatcher_id) in
1101 &reschedule.upstream_fragment_dispatcher_ids
1102 {
1103 let upstream_actor_ids = fragment_actors
1105 .get(&upstream_fragment_id)
1106 .expect("should contain");
1107
1108 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1109
1110 for &actor_id in upstream_actor_ids {
1112 let added_downstream_actor_id = if upstream_reschedule
1113 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1114 .unwrap_or(true)
1115 {
1116 reschedule
1117 .added_actors
1118 .values()
1119 .flatten()
1120 .cloned()
1121 .collect()
1122 } else {
1123 Default::default()
1124 };
1125 dispatcher_update
1127 .try_insert(
1128 (actor_id, dispatcher_id),
1129 DispatcherUpdate {
1130 actor_id,
1131 dispatcher_id,
1132 hash_mapping: reschedule
1133 .upstream_dispatcher_mapping
1134 .as_ref()
1135 .map(|m| m.to_protobuf()),
1136 added_downstream_actor_id,
1137 removed_downstream_actor_id: reschedule
1138 .removed_actors
1139 .iter()
1140 .cloned()
1141 .collect(),
1142 },
1143 )
1144 .unwrap();
1145 }
1146 }
1147 }
1148 let dispatcher_update = dispatcher_update.into_values().collect();
1149
1150 let mut merge_update = HashMap::new();
1151 for (&fragment_id, reschedule) in reschedules {
1152 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1153 let downstream_actor_ids = fragment_actors
1155 .get(&downstream_fragment_id)
1156 .expect("should contain");
1157
1158 let downstream_removed_actors: HashSet<_> = reschedules
1162 .get(&downstream_fragment_id)
1163 .map(|downstream_reschedule| {
1164 downstream_reschedule
1165 .removed_actors
1166 .iter()
1167 .copied()
1168 .collect()
1169 })
1170 .unwrap_or_default();
1171
1172 for &actor_id in downstream_actor_ids {
1174 if downstream_removed_actors.contains(&actor_id) {
1175 continue;
1176 }
1177
1178 merge_update
1180 .try_insert(
1181 (actor_id, fragment_id),
1182 MergeUpdate {
1183 actor_id,
1184 upstream_fragment_id: fragment_id,
1185 new_upstream_fragment_id: None,
1186 added_upstream_actors: reschedule
1187 .added_actors
1188 .iter()
1189 .flat_map(|(worker_id, actors)| {
1190 let host =
1191 control_stream_manager.host_addr(*worker_id);
1192 actors.iter().map(move |&actor_id| PbActorInfo {
1193 actor_id,
1194 host: Some(host.clone()),
1195 })
1196 })
1197 .collect(),
1198 removed_upstream_actor_id: reschedule
1199 .removed_actors
1200 .iter()
1201 .cloned()
1202 .collect(),
1203 },
1204 )
1205 .unwrap();
1206 }
1207 }
1208 }
1209 let merge_update = merge_update.into_values().collect();
1210
1211 let mut actor_vnode_bitmap_update = HashMap::new();
1212 for reschedule in reschedules.values() {
1213 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1215 let bitmap = bitmap.to_protobuf();
1216 actor_vnode_bitmap_update
1217 .try_insert(actor_id, bitmap)
1218 .unwrap();
1219 }
1220 }
1221 let dropped_actors = reschedules
1222 .values()
1223 .flat_map(|r| r.removed_actors.iter().copied())
1224 .collect();
1225 let mut actor_splits = HashMap::new();
1226 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1227 let mut cdc_table_job_ids: HashSet<_> = HashSet::default();
1228 for reschedule in reschedules.values() {
1229 for (actor_id, splits) in &reschedule.actor_splits {
1230 actor_splits.insert(
1231 *actor_id,
1232 ConnectorSplits {
1233 splits: splits.iter().map(ConnectorSplit::from).collect(),
1234 },
1235 );
1236 }
1237 actor_cdc_table_snapshot_splits.extend(
1238 build_pb_actor_cdc_table_snapshot_splits(
1239 reschedule.cdc_table_snapshot_split_assignment.clone(),
1240 ),
1241 );
1242 if let Some(cdc_table_job_id) = reschedule.cdc_table_job_id {
1243 cdc_table_job_ids.insert(cdc_table_job_id);
1244 }
1245 }
1246
1247 let actor_new_dispatchers = HashMap::new();
1249 let actor_cdc_table_snapshot_splits = if actor_cdc_table_snapshot_splits.is_empty()
1250 {
1251 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1252 CdcTableSnapshotSplitAssignmentWithGeneration::empty(),
1253 )
1254 .into()
1255 } else {
1256 PbCdcTableSnapshotSplitsWithGeneration {
1257 splits: actor_cdc_table_snapshot_splits,
1258 generation: control_stream_manager
1259 .env
1260 .cdc_table_backfill_tracker
1261 .next_generation(cdc_table_job_ids.into_iter()),
1262 }
1263 .into()
1264 };
1265 let mutation = Mutation::Update(UpdateMutation {
1266 dispatcher_update,
1267 merge_update,
1268 actor_vnode_bitmap_update,
1269 dropped_actors,
1270 actor_splits,
1271 actor_new_dispatchers,
1272 actor_cdc_table_snapshot_splits,
1273 sink_add_columns: Default::default(),
1274 });
1275 tracing::debug!("update mutation: {mutation:?}");
1276 Some(mutation)
1277 }
1278
1279 Command::CreateSubscription {
1280 upstream_mv_table_id,
1281 subscription_id,
1282 ..
1283 } => Some(Mutation::Add(AddMutation {
1284 actor_dispatchers: Default::default(),
1285 added_actors: vec![],
1286 actor_splits: Default::default(),
1287 pause: false,
1288 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1289 upstream_mv_table_id: *upstream_mv_table_id,
1290 subscriber_id: subscription_id.as_raw_id(),
1291 }],
1292 backfill_nodes_to_pause: vec![],
1293 actor_cdc_table_snapshot_splits: Default::default(),
1294 new_upstream_sinks: Default::default(),
1295 })),
1296 Command::DropSubscription {
1297 upstream_mv_table_id,
1298 subscription_id,
1299 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1300 info: vec![SubscriptionUpstreamInfo {
1301 subscriber_id: subscription_id.as_raw_id(),
1302 upstream_mv_table_id: *upstream_mv_table_id,
1303 }],
1304 })),
1305 Command::ConnectorPropsChange(config) => {
1306 let mut connector_props_infos = HashMap::default();
1307 for (k, v) in config {
1308 connector_props_infos.insert(
1309 k.as_raw_id(),
1310 ConnectorPropsInfo {
1311 connector_props_info: v.clone(),
1312 },
1313 );
1314 }
1315 Some(Mutation::ConnectorPropsChange(
1316 ConnectorPropsChangeMutation {
1317 connector_props_infos,
1318 },
1319 ))
1320 }
1321 Command::StartFragmentBackfill { fragment_ids } => Some(
1322 Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1323 fragment_ids: fragment_ids.clone(),
1324 }),
1325 ),
1326 Command::Refresh {
1327 table_id,
1328 associated_source_id,
1329 } => Some(Mutation::RefreshStart(
1330 risingwave_pb::stream_plan::RefreshStartMutation {
1331 table_id: *table_id,
1332 associated_source_id: *associated_source_id,
1333 },
1334 )),
1335 Command::ListFinish {
1336 table_id: _,
1337 associated_source_id,
1338 } => Some(Mutation::ListFinish(ListFinishMutation {
1339 associated_source_id: *associated_source_id,
1340 })),
1341 Command::LoadFinish {
1342 table_id: _,
1343 associated_source_id,
1344 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1345 associated_source_id: *associated_source_id,
1346 })),
1347 }
1348 }
1349
1350 pub(super) fn actors_to_create(
1351 &self,
1352 graph_info: &InflightDatabaseInfo,
1353 edges: &mut Option<FragmentEdgeBuildResult>,
1354 control_stream_manager: &ControlStreamManager,
1355 ) -> Option<StreamJobActorsToCreate> {
1356 match self {
1357 Command::CreateStreamingJob { info, job_type, .. } => {
1358 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1359 return None;
1361 }
1362 let actors_to_create = info.stream_job_fragments.actors_to_create();
1363 let edges = edges.as_mut().expect("should exist");
1364 Some(edges.collect_actors_to_create(actors_to_create.map(
1365 |(fragment_id, node, actors)| {
1366 (
1367 fragment_id,
1368 node,
1369 actors,
1370 [], )
1372 },
1373 )))
1374 }
1375 Command::RescheduleFragment {
1376 reschedules,
1377 fragment_actors,
1378 ..
1379 } => {
1380 let mut actor_upstreams = Self::collect_actor_upstreams(
1381 reschedules.iter().map(|(fragment_id, reschedule)| {
1382 (
1383 *fragment_id,
1384 reschedule.newly_created_actors.values().map(
1385 |((actor, dispatchers), _)| {
1386 (actor.actor_id, dispatchers.as_slice())
1387 },
1388 ),
1389 )
1390 }),
1391 Some((reschedules, fragment_actors)),
1392 graph_info,
1393 control_stream_manager,
1394 );
1395 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1396 for (fragment_id, (actor, dispatchers), worker_id) in
1397 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1398 reschedule
1399 .newly_created_actors
1400 .values()
1401 .map(|(actors, status)| (*fragment_id, actors, status))
1402 })
1403 {
1404 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1405 map.entry(*worker_id)
1406 .or_default()
1407 .entry(fragment_id)
1408 .or_insert_with(|| {
1409 let node = graph_info.fragment(fragment_id).nodes.clone();
1410 let subscribers =
1411 graph_info.fragment_subscribers(fragment_id).collect();
1412 (node, vec![], subscribers)
1413 })
1414 .1
1415 .push((actor.clone(), upstreams, dispatchers.clone()));
1416 }
1417 Some(map)
1418 }
1419 Command::ReplaceStreamJob(replace_table) => {
1420 let edges = edges.as_mut().expect("should exist");
1421 let mut actors = edges.collect_actors_to_create(
1422 replace_table.new_fragments.actors_to_create().map(
1423 |(fragment_id, node, actors)| {
1424 (
1425 fragment_id,
1426 node,
1427 actors,
1428 graph_info
1429 .job_subscribers(replace_table.old_fragments.stream_job_id),
1430 )
1431 },
1432 ),
1433 );
1434 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1435 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1436 (
1437 sink.new_fragment.fragment_id,
1438 &sink.new_fragment.nodes,
1439 sink.new_fragment.actors.iter().map(|actor| {
1440 (
1441 actor,
1442 sink.actor_status[&actor.actor_id]
1443 .location
1444 .as_ref()
1445 .unwrap()
1446 .worker_node_id,
1447 )
1448 }),
1449 graph_info.job_subscribers(sink.original_sink.id.as_job_id()),
1450 )
1451 }));
1452 for (worker_id, fragment_actors) in sink_actors {
1453 actors.entry(worker_id).or_default().extend(fragment_actors);
1454 }
1455 }
1456 Some(actors)
1457 }
1458 _ => None,
1459 }
1460 }
1461
1462 fn generate_update_mutation_for_replace_table(
1463 dropped_actors: impl IntoIterator<Item = ActorId>,
1464 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1465 dispatchers: FragmentActorDispatchers,
1466 init_split_assignment: &SplitAssignment,
1467 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
1468 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1469 ) -> Option<Mutation> {
1470 let dropped_actors = dropped_actors.into_iter().collect();
1471
1472 let actor_new_dispatchers = dispatchers
1473 .into_values()
1474 .flatten()
1475 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1476 .collect();
1477
1478 let actor_splits = init_split_assignment
1479 .values()
1480 .flat_map(build_actor_connector_splits)
1481 .collect();
1482 Some(Mutation::Update(UpdateMutation {
1483 actor_new_dispatchers,
1484 merge_update: merge_updates.into_values().flatten().collect(),
1485 dropped_actors,
1486 actor_splits,
1487 actor_cdc_table_snapshot_splits:
1488 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1489 cdc_table_snapshot_split_assignment,
1490 )
1491 .into(),
1492 sink_add_columns: auto_refresh_schema_sinks
1493 .as_ref()
1494 .into_iter()
1495 .flat_map(|sinks| {
1496 sinks.iter().map(|sink| {
1497 (
1498 sink.original_sink.id,
1499 PbSinkAddColumns {
1500 fields: sink
1501 .newly_add_fields
1502 .iter()
1503 .map(|field| field.to_prost())
1504 .collect(),
1505 },
1506 )
1507 })
1508 })
1509 .collect(),
1510 ..Default::default()
1511 }))
1512 }
1513
1514 pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_ {
1516 match self {
1517 Command::DropStreamingJobs {
1518 streaming_job_ids, ..
1519 } => Some(streaming_job_ids.iter().cloned()),
1520 _ => None,
1521 }
1522 .into_iter()
1523 .flatten()
1524 }
1525}
1526
1527impl Command {
1528 #[expect(clippy::type_complexity)]
1529 pub(super) fn collect_actor_upstreams(
1530 actor_dispatchers: impl Iterator<
1531 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1532 >,
1533 reschedule_dispatcher_update: Option<(
1534 &HashMap<FragmentId, Reschedule>,
1535 &HashMap<FragmentId, HashSet<ActorId>>,
1536 )>,
1537 graph_info: &InflightDatabaseInfo,
1538 control_stream_manager: &ControlStreamManager,
1539 ) -> HashMap<ActorId, ActorUpstreams> {
1540 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1541 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1542 let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1543 for (upstream_actor_id, dispatchers) in upstream_actors {
1544 let upstream_actor_location =
1545 upstream_fragment.actors[&upstream_actor_id].worker_id;
1546 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1547 for downstream_actor_id in dispatchers
1548 .iter()
1549 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1550 {
1551 actor_upstreams
1552 .entry(*downstream_actor_id)
1553 .or_default()
1554 .entry(upstream_fragment_id)
1555 .or_default()
1556 .insert(
1557 upstream_actor_id,
1558 PbActorInfo {
1559 actor_id: upstream_actor_id,
1560 host: Some(upstream_actor_host.clone()),
1561 },
1562 );
1563 }
1564 }
1565 }
1566 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1567 for reschedule in reschedules.values() {
1568 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1569 let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1570 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1571 for upstream_actor_id in fragment_actors
1572 .get(upstream_fragment_id)
1573 .expect("should exist")
1574 {
1575 let upstream_actor_location =
1576 upstream_fragment.actors[upstream_actor_id].worker_id;
1577 let upstream_actor_host =
1578 control_stream_manager.host_addr(upstream_actor_location);
1579 if let Some(upstream_reschedule) = upstream_reschedule
1580 && upstream_reschedule
1581 .removed_actors
1582 .contains(upstream_actor_id)
1583 {
1584 continue;
1585 }
1586 for (_, downstream_actor_id) in
1587 reschedule
1588 .added_actors
1589 .iter()
1590 .flat_map(|(worker_id, actors)| {
1591 actors.iter().map(|actor| (*worker_id, *actor))
1592 })
1593 {
1594 actor_upstreams
1595 .entry(downstream_actor_id)
1596 .or_default()
1597 .entry(*upstream_fragment_id)
1598 .or_default()
1599 .insert(
1600 *upstream_actor_id,
1601 PbActorInfo {
1602 actor_id: *upstream_actor_id,
1603 host: Some(upstream_actor_host.clone()),
1604 },
1605 );
1606 }
1607 }
1608 }
1609 }
1610 }
1611 actor_upstreams
1612 }
1613}