1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18use std::iter;
19
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::TableId;
23use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
24use risingwave_common::id::{JobId, SourceId};
25use risingwave_common::must_match;
26use risingwave_common::types::Timestamptz;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::{
31 CdcTableSnapshotSplitAssignment, CdcTableSnapshotSplitAssignmentWithGeneration,
32 build_pb_actor_cdc_table_snapshot_splits,
33 build_pb_actor_cdc_table_snapshot_splits_with_generation,
34};
35use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
36use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
37use risingwave_meta_model::WorkerId;
38use risingwave_pb::catalog::CreateType;
39use risingwave_pb::catalog::table::PbTableType;
40use risingwave_pb::common::PbActorInfo;
41use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
42use risingwave_pb::source::{
43 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
44};
45use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
46use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
47use risingwave_pb::stream_plan::barrier_mutation::Mutation;
48use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
49use risingwave_pb::stream_plan::stream_node::NodeBody;
50use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
51use risingwave_pb::stream_plan::update_mutation::*;
52use risingwave_pb::stream_plan::{
53 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
54 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumns, PbUpstreamSinkInfo,
55 ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
56 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
57};
58use risingwave_pb::stream_service::BarrierCompleteResponse;
59use tracing::warn;
60
61use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
62use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
63use crate::barrier::edge_builder::FragmentEdgeBuildResult;
64use crate::barrier::info::BarrierInfo;
65use crate::barrier::rpc::ControlStreamManager;
66use crate::barrier::utils::collect_resp_info;
67use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
68use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
69use crate::manager::{StreamingJob, StreamingJobType};
70use crate::model::{
71 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
72 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
73 StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
74};
75use crate::stream::{
76 AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
77 SplitState, ThrottleConfig, UpstreamSinkInfo, build_actor_connector_splits,
78};
79
80#[derive(Debug, Clone)]
83pub struct Reschedule {
84 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
86
87 pub removed_actors: HashSet<ActorId>,
89
90 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
92
93 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
95 pub upstream_dispatcher_mapping: Option<ActorMapping>,
100
101 pub downstream_fragment_ids: Vec<FragmentId>,
103
104 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
108
109 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
110
111 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
112
113 pub cdc_table_job_id: Option<JobId>,
114}
115
116#[derive(Debug, Clone)]
123pub struct ReplaceStreamJobPlan {
124 pub old_fragments: StreamJobFragments,
125 pub new_fragments: StreamJobFragmentsToCreate,
126 pub replace_upstream: FragmentReplaceUpstream,
129 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
130 pub init_split_assignment: SplitAssignment,
136 pub streaming_job: StreamingJob,
138 pub tmp_id: JobId,
140 pub to_drop_state_table_ids: Vec<TableId>,
142 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
143 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
144}
145
146impl ReplaceStreamJobPlan {
147 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
148 let mut fragment_changes = HashMap::new();
149 for (fragment_id, new_fragment) in self
150 .new_fragments
151 .new_fragment_info(&self.init_split_assignment)
152 {
153 let fragment_change = CommandFragmentChanges::NewFragment {
154 job_id: self.streaming_job.id(),
155 info: new_fragment,
156 is_existing: false,
157 };
158 fragment_changes
159 .try_insert(fragment_id, fragment_change)
160 .expect("non-duplicate");
161 }
162 for fragment in self.old_fragments.fragments.values() {
163 fragment_changes
164 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
165 .expect("non-duplicate");
166 }
167 for (fragment_id, replace_map) in &self.replace_upstream {
168 fragment_changes
169 .try_insert(
170 *fragment_id,
171 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
172 )
173 .expect("non-duplicate");
174 }
175 if let Some(sinks) = &self.auto_refresh_schema_sinks {
176 for sink in sinks {
177 let fragment_change = CommandFragmentChanges::NewFragment {
178 job_id: sink.original_sink.id.as_job_id(),
179 info: sink.new_fragment_info(),
180 is_existing: false,
181 };
182 fragment_changes
183 .try_insert(sink.new_fragment.fragment_id, fragment_change)
184 .expect("non-duplicate");
185 fragment_changes
186 .try_insert(
187 sink.original_fragment.fragment_id,
188 CommandFragmentChanges::RemoveFragment,
189 )
190 .expect("non-duplicate");
191 }
192 }
193 fragment_changes
194 }
195
196 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
198 let mut fragment_replacements = HashMap::new();
199 for (upstream_fragment_id, new_upstream_fragment_id) in
200 self.replace_upstream.values().flatten()
201 {
202 {
203 let r =
204 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
205 if let Some(r) = r {
206 assert_eq!(
207 *new_upstream_fragment_id, r,
208 "one fragment is replaced by multiple fragments"
209 );
210 }
211 }
212 }
213 fragment_replacements
214 }
215}
216
217#[derive(educe::Educe, Clone)]
218#[educe(Debug)]
219pub struct CreateStreamingJobCommandInfo {
220 #[educe(Debug(ignore))]
221 pub stream_job_fragments: StreamJobFragmentsToCreate,
222 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
223 pub init_split_assignment: SplitAssignment,
224 pub definition: String,
225 pub job_type: StreamingJobType,
226 pub create_type: CreateType,
227 pub streaming_job: StreamingJob,
228 pub fragment_backfill_ordering: FragmentBackfillOrder,
229 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
230 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
231}
232
233impl StreamJobFragments {
234 pub(super) fn new_fragment_info<'a>(
235 &'a self,
236 assignment: &'a SplitAssignment,
237 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
238 self.fragments.values().map(|fragment| {
239 let mut fragment_splits = assignment
240 .get(&fragment.fragment_id)
241 .cloned()
242 .unwrap_or_default();
243
244 (
245 fragment.fragment_id,
246 InflightFragmentInfo {
247 fragment_id: fragment.fragment_id,
248 distribution_type: fragment.distribution_type.into(),
249 fragment_type_mask: fragment.fragment_type_mask,
250 vnode_count: fragment.vnode_count(),
251 nodes: fragment.nodes.clone(),
252 actors: fragment
253 .actors
254 .iter()
255 .map(|actor| {
256 (
257 actor.actor_id,
258 InflightActorInfo {
259 worker_id: self
260 .actor_status
261 .get(&actor.actor_id)
262 .expect("should exist")
263 .worker_id(),
264 vnode_bitmap: actor.vnode_bitmap.clone(),
265 splits: fragment_splits
266 .remove(&actor.actor_id)
267 .unwrap_or_default(),
268 },
269 )
270 })
271 .collect(),
272 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
273 },
274 )
275 })
276 }
277}
278
279#[derive(Debug, Clone)]
280pub struct SnapshotBackfillInfo {
281 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
285}
286
287#[derive(Debug, Clone)]
288pub enum CreateStreamingJobType {
289 Normal,
290 SinkIntoTable(UpstreamSinkInfo),
291 SnapshotBackfill(SnapshotBackfillInfo),
292}
293
294#[derive(Debug)]
299pub enum Command {
300 Flush,
303
304 Pause,
307
308 Resume,
312
313 DropStreamingJobs {
321 streaming_job_ids: HashSet<JobId>,
322 actors: Vec<ActorId>,
323 unregistered_state_table_ids: HashSet<TableId>,
324 unregistered_fragment_ids: HashSet<FragmentId>,
325 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
327 },
328
329 CreateStreamingJob {
339 info: CreateStreamingJobCommandInfo,
340 job_type: CreateStreamingJobType,
341 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
342 },
343 MergeSnapshotBackfillStreamingJobs(HashMap<JobId, HashSet<TableId>>),
344
345 RescheduleFragment {
351 reschedules: HashMap<FragmentId, Reschedule>,
352 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
354 },
355
356 ReplaceStreamJob(ReplaceStreamJobPlan),
363
364 SourceChangeSplit(SplitState),
367
368 Throttle(ThrottleConfig),
371
372 CreateSubscription {
375 subscription_id: SubscriptionId,
376 upstream_mv_table_id: TableId,
377 retention_second: u64,
378 },
379
380 DropSubscription {
384 subscription_id: SubscriptionId,
385 upstream_mv_table_id: TableId,
386 },
387
388 ConnectorPropsChange(ConnectorPropsChange),
389
390 StartFragmentBackfill {
392 fragment_ids: Vec<FragmentId>,
393 },
394
395 Refresh {
398 table_id: TableId,
399 associated_source_id: SourceId,
400 },
401 ListFinish {
402 table_id: TableId,
403 associated_source_id: SourceId,
404 },
405 LoadFinish {
406 table_id: TableId,
407 associated_source_id: SourceId,
408 },
409}
410
411impl std::fmt::Display for Command {
413 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
414 match self {
415 Command::Flush => write!(f, "Flush"),
416 Command::Pause => write!(f, "Pause"),
417 Command::Resume => write!(f, "Resume"),
418 Command::DropStreamingJobs {
419 streaming_job_ids, ..
420 } => {
421 write!(
422 f,
423 "DropStreamingJobs: {}",
424 streaming_job_ids.iter().sorted().join(", ")
425 )
426 }
427 Command::CreateStreamingJob { info, .. } => {
428 write!(f, "CreateStreamingJob: {}", info.streaming_job)
429 }
430 Command::MergeSnapshotBackfillStreamingJobs(_) => {
431 write!(f, "MergeSnapshotBackfillStreamingJobs")
432 }
433 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
434 Command::ReplaceStreamJob(plan) => {
435 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
436 }
437 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
438 Command::Throttle(_) => write!(f, "Throttle"),
439 Command::CreateSubscription {
440 subscription_id, ..
441 } => write!(f, "CreateSubscription: {subscription_id}"),
442 Command::DropSubscription {
443 subscription_id, ..
444 } => write!(f, "DropSubscription: {subscription_id}"),
445 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
446 Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
447 Command::Refresh {
448 table_id,
449 associated_source_id,
450 } => write!(
451 f,
452 "Refresh: {} (source: {})",
453 table_id, associated_source_id
454 ),
455 Command::ListFinish {
456 table_id,
457 associated_source_id,
458 } => write!(
459 f,
460 "ListFinish: {} (source: {})",
461 table_id, associated_source_id
462 ),
463 Command::LoadFinish {
464 table_id,
465 associated_source_id,
466 } => write!(
467 f,
468 "LoadFinish: {} (source: {})",
469 table_id, associated_source_id
470 ),
471 }
472 }
473}
474
475impl Command {
476 pub fn pause() -> Self {
477 Self::Pause
478 }
479
480 pub fn resume() -> Self {
481 Self::Resume
482 }
483
484 pub(super) fn fragment_changes(
485 &self,
486 ) -> Option<(Option<JobId>, HashMap<FragmentId, CommandFragmentChanges>)> {
487 match self {
488 Command::Flush => None,
489 Command::Pause => None,
490 Command::Resume => None,
491 Command::DropStreamingJobs {
492 unregistered_fragment_ids,
493 dropped_sink_fragment_by_targets,
494 ..
495 } => {
496 let changes = unregistered_fragment_ids
497 .iter()
498 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
499 .chain(dropped_sink_fragment_by_targets.iter().map(
500 |(target_fragment, sink_fragments)| {
501 (
502 *target_fragment,
503 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
504 )
505 },
506 ))
507 .collect();
508
509 Some((None, changes))
510 }
511 Command::CreateStreamingJob { info, job_type, .. } => {
512 assert!(
513 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
514 "should handle fragment changes separately for snapshot backfill"
515 );
516 let mut changes: HashMap<_, _> = info
517 .stream_job_fragments
518 .new_fragment_info(&info.init_split_assignment)
519 .map(|(fragment_id, fragment_infos)| {
520 (
521 fragment_id,
522 CommandFragmentChanges::NewFragment {
523 job_id: info.streaming_job.id(),
524 info: fragment_infos,
525 is_existing: false,
526 },
527 )
528 })
529 .collect();
530
531 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
532 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
533 changes.insert(
534 downstream_fragment_id,
535 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
536 upstream_fragment_id: ctx.sink_fragment_id,
537 sink_output_schema: ctx.sink_output_fields.clone(),
538 project_exprs: ctx.project_exprs.clone(),
539 }),
540 );
541 }
542
543 Some((Some(info.streaming_job.id()), changes))
544 }
545 Command::RescheduleFragment { reschedules, .. } => Some((
546 None,
547 reschedules
548 .iter()
549 .map(|(fragment_id, reschedule)| {
550 (
551 *fragment_id,
552 CommandFragmentChanges::Reschedule {
553 new_actors: reschedule
554 .added_actors
555 .iter()
556 .flat_map(|(node_id, actors)| {
557 actors.iter().map(|actor_id| {
558 (
559 *actor_id,
560 InflightActorInfo {
561 worker_id: *node_id,
562 vnode_bitmap: reschedule
563 .newly_created_actors
564 .get(actor_id)
565 .expect("should exist")
566 .0
567 .0
568 .vnode_bitmap
569 .clone(),
570 splits: reschedule
571 .actor_splits
572 .get(actor_id)
573 .cloned()
574 .unwrap_or_default(),
575 },
576 )
577 })
578 })
579 .collect(),
580 actor_update_vnode_bitmap: reschedule
581 .vnode_bitmap_updates
582 .iter()
583 .filter(|(actor_id, _)| {
584 !reschedule.newly_created_actors.contains_key(actor_id)
586 })
587 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
588 .collect(),
589 to_remove: reschedule.removed_actors.iter().cloned().collect(),
590 actor_splits: reschedule.actor_splits.clone(),
591 },
592 )
593 })
594 .collect(),
595 )),
596 Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
597 Command::MergeSnapshotBackfillStreamingJobs(_) => None,
598 Command::SourceChangeSplit(SplitState {
599 split_assignment, ..
600 }) => Some((
601 None,
602 split_assignment
603 .iter()
604 .map(|(&fragment_id, splits)| {
605 (
606 fragment_id,
607 CommandFragmentChanges::SplitAssignment {
608 actor_splits: splits.clone(),
609 },
610 )
611 })
612 .collect(),
613 )),
614 Command::Throttle(_) => None,
615 Command::CreateSubscription { .. } => None,
616 Command::DropSubscription { .. } => None,
617 Command::ConnectorPropsChange(_) => None,
618 Command::StartFragmentBackfill { .. } => None,
619 Command::Refresh { .. } => None, Command::ListFinish { .. } => None, Command::LoadFinish { .. } => None, }
623 }
624
625 pub fn need_checkpoint(&self) -> bool {
626 !matches!(self, Command::Resume)
628 }
629}
630
631#[derive(Debug, Clone)]
632pub enum BarrierKind {
633 Initial,
634 Barrier,
635 Checkpoint(Vec<u64>),
637}
638
639impl BarrierKind {
640 pub fn to_protobuf(&self) -> PbBarrierKind {
641 match self {
642 BarrierKind::Initial => PbBarrierKind::Initial,
643 BarrierKind::Barrier => PbBarrierKind::Barrier,
644 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
645 }
646 }
647
648 pub fn is_checkpoint(&self) -> bool {
649 matches!(self, BarrierKind::Checkpoint(_))
650 }
651
652 pub fn is_initial(&self) -> bool {
653 matches!(self, BarrierKind::Initial)
654 }
655
656 pub fn as_str_name(&self) -> &'static str {
657 match self {
658 BarrierKind::Initial => "Initial",
659 BarrierKind::Barrier => "Barrier",
660 BarrierKind::Checkpoint(_) => "Checkpoint",
661 }
662 }
663}
664
665pub(super) struct CommandContext {
668 mv_subscription_max_retention: HashMap<TableId, u64>,
669
670 pub(super) barrier_info: BarrierInfo,
671
672 pub(super) table_ids_to_commit: HashSet<TableId>,
673
674 pub(super) command: Option<Command>,
675
676 _span: tracing::Span,
682}
683
684impl std::fmt::Debug for CommandContext {
685 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
686 f.debug_struct("CommandContext")
687 .field("barrier_info", &self.barrier_info)
688 .field("command", &self.command)
689 .finish()
690 }
691}
692
693impl std::fmt::Display for CommandContext {
694 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
695 write!(
696 f,
697 "prev_epoch={}, curr_epoch={}, kind={}",
698 self.barrier_info.prev_epoch.value().0,
699 self.barrier_info.curr_epoch.value().0,
700 self.barrier_info.kind.as_str_name()
701 )?;
702 if let Some(command) = &self.command {
703 write!(f, ", command={}", command)?;
704 }
705 Ok(())
706 }
707}
708
709impl CommandContext {
710 pub(super) fn new(
711 barrier_info: BarrierInfo,
712 mv_subscription_max_retention: HashMap<TableId, u64>,
713 table_ids_to_commit: HashSet<TableId>,
714 command: Option<Command>,
715 span: tracing::Span,
716 ) -> Self {
717 Self {
718 mv_subscription_max_retention,
719 barrier_info,
720 table_ids_to_commit,
721 command,
722 _span: span,
723 }
724 }
725
726 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
727 let Some(truncate_timestamptz) = Timestamptz::from_secs(
728 self.barrier_info
729 .prev_epoch
730 .value()
731 .as_timestamptz()
732 .timestamp()
733 - retention_second as i64,
734 ) else {
735 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
736 return self.barrier_info.prev_epoch.value();
737 };
738 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
739 }
740
741 pub(super) fn collect_commit_epoch_info(
742 &self,
743 info: &mut CommitEpochInfo,
744 resps: Vec<BarrierCompleteResponse>,
745 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
746 ) {
747 let (
748 sst_to_context,
749 synced_ssts,
750 new_table_watermarks,
751 old_value_ssts,
752 vector_index_adds,
753 truncate_tables,
754 ) = collect_resp_info(resps);
755
756 let new_table_fragment_infos =
757 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
758 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
759 {
760 let table_fragments = &info.stream_job_fragments;
761 let mut table_ids: HashSet<_> =
762 table_fragments.internal_table_ids().into_iter().collect();
763 if let Some(mv_table_id) = table_fragments.mv_table_id() {
764 table_ids.insert(mv_table_id);
765 }
766
767 vec![NewTableFragmentInfo { table_ids }]
768 } else {
769 vec![]
770 };
771
772 let mut mv_log_store_truncate_epoch = HashMap::new();
773 let mut update_truncate_epoch =
775 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
776 Entry::Occupied(mut entry) => {
777 let prev_truncate_epoch = entry.get_mut();
778 if truncate_epoch < *prev_truncate_epoch {
779 *prev_truncate_epoch = truncate_epoch;
780 }
781 }
782 Entry::Vacant(entry) => {
783 entry.insert(truncate_epoch);
784 }
785 };
786 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
787 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
788 update_truncate_epoch(*mv_table_id, truncate_epoch);
789 }
790 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
791 for mv_table_id in upstream_mv_table_ids {
792 update_truncate_epoch(mv_table_id, backfill_epoch);
793 }
794 }
795
796 let table_new_change_log = build_table_change_log_delta(
797 old_value_ssts.into_iter(),
798 synced_ssts.iter().map(|sst| &sst.sst_info),
799 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
800 mv_log_store_truncate_epoch.into_iter(),
801 );
802
803 let epoch = self.barrier_info.prev_epoch();
804 for table_id in &self.table_ids_to_commit {
805 info.tables_to_commit
806 .try_insert(*table_id, epoch)
807 .expect("non duplicate");
808 }
809
810 info.sstables.extend(synced_ssts);
811 info.new_table_watermarks.extend(new_table_watermarks);
812 info.sst_to_context.extend(sst_to_context);
813 info.new_table_fragment_infos
814 .extend(new_table_fragment_infos);
815 info.change_log_delta.extend(table_new_change_log);
816 for (table_id, vector_index_adds) in vector_index_adds {
817 info.vector_index_delta
818 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
819 .expect("non-duplicate");
820 }
821 if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
822 for fragment in job_info.stream_job_fragments.fragments.values() {
823 visit_stream_node_cont(&fragment.nodes, |node| {
824 match node.node_body.as_ref().unwrap() {
825 NodeBody::VectorIndexWrite(vector_index_write) => {
826 let index_table = vector_index_write.table.as_ref().unwrap();
827 assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
828 info.vector_index_delta
829 .try_insert(
830 index_table.id,
831 VectorIndexDelta::Init(PbVectorIndexInit {
832 info: Some(index_table.vector_index_info.unwrap()),
833 }),
834 )
835 .expect("non-duplicate");
836 false
837 }
838 _ => true,
839 }
840 })
841 }
842 }
843 info.truncate_tables.extend(truncate_tables);
844 }
845}
846
847impl Command {
848 pub(super) fn to_mutation(
852 &self,
853 is_currently_paused: bool,
854 edges: &mut Option<FragmentEdgeBuildResult>,
855 control_stream_manager: &ControlStreamManager,
856 ) -> Option<Mutation> {
857 match self {
858 Command::Flush => None,
859
860 Command::Pause => {
861 if !is_currently_paused {
864 Some(Mutation::Pause(PauseMutation {}))
865 } else {
866 None
867 }
868 }
869
870 Command::Resume => {
871 if is_currently_paused {
873 Some(Mutation::Resume(ResumeMutation {}))
874 } else {
875 None
876 }
877 }
878
879 Command::SourceChangeSplit(SplitState {
880 split_assignment, ..
881 }) => {
882 let mut diff = HashMap::new();
883
884 for actor_splits in split_assignment.values() {
885 diff.extend(actor_splits.clone());
886 }
887
888 Some(Mutation::Splits(SourceChangeSplitMutation {
889 actor_splits: build_actor_connector_splits(&diff),
890 }))
891 }
892
893 Command::Throttle(config) => {
894 let mut actor_to_apply = HashMap::new();
895 for per_fragment in config.values() {
896 actor_to_apply.extend(
897 per_fragment
898 .iter()
899 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
900 );
901 }
902
903 Some(Mutation::Throttle(ThrottleMutation {
904 actor_throttle: actor_to_apply,
905 }))
906 }
907
908 Command::DropStreamingJobs {
909 actors,
910 dropped_sink_fragment_by_targets,
911 ..
912 } => Some(Mutation::Stop(StopMutation {
913 actors: actors.clone(),
914 dropped_sink_fragments: dropped_sink_fragment_by_targets
915 .values()
916 .flatten()
917 .cloned()
918 .collect(),
919 })),
920
921 Command::CreateStreamingJob {
922 info:
923 CreateStreamingJobCommandInfo {
924 stream_job_fragments: table_fragments,
925 init_split_assignment: split_assignment,
926 upstream_fragment_downstreams,
927 fragment_backfill_ordering,
928 cdc_table_snapshot_split_assignment,
929 ..
930 },
931 job_type,
932 ..
933 } => {
934 let edges = edges.as_mut().expect("should exist");
935 let added_actors = table_fragments.actor_ids().collect();
936 let actor_splits = split_assignment
937 .values()
938 .flat_map(build_actor_connector_splits)
939 .collect();
940 let subscriptions_to_add =
941 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
942 job_type
943 {
944 snapshot_backfill_info
945 .upstream_mv_table_id_to_backfill_epoch
946 .keys()
947 .map(|table_id| SubscriptionUpstreamInfo {
948 subscriber_id: table_fragments.stream_job_id().as_subscriber_id(),
949 upstream_mv_table_id: *table_id,
950 })
951 .collect()
952 } else {
953 Default::default()
954 };
955 let backfill_nodes_to_pause: Vec<_> =
956 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
957 .into_iter()
958 .collect();
959
960 let new_upstream_sinks =
961 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
962 sink_fragment_id,
963 sink_output_fields,
964 project_exprs,
965 new_sink_downstream,
966 ..
967 }) = job_type
968 {
969 let new_sink_actors = table_fragments
970 .actors_to_create()
971 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
972 .exactly_one()
973 .map(|(_, _, actors)| {
974 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
975 actor_id: actor.actor_id,
976 host: Some(control_stream_manager.host_addr(worker_id)),
977 })
978 })
979 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
980 let new_upstream_sink = PbNewUpstreamSink {
981 info: Some(PbUpstreamSinkInfo {
982 upstream_fragment_id: *sink_fragment_id,
983 sink_output_schema: sink_output_fields.clone(),
984 project_exprs: project_exprs.clone(),
985 }),
986 upstream_actors: new_sink_actors.collect(),
987 };
988 HashMap::from([(
989 new_sink_downstream.downstream_fragment_id,
990 new_upstream_sink,
991 )])
992 } else {
993 HashMap::new()
994 };
995
996 let add_mutation = AddMutation {
997 actor_dispatchers: edges
998 .dispatchers
999 .extract_if(|fragment_id, _| {
1000 upstream_fragment_downstreams.contains_key(fragment_id)
1001 })
1002 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1003 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1004 .collect(),
1005 added_actors,
1006 actor_splits,
1007 pause: is_currently_paused,
1009 subscriptions_to_add,
1010 backfill_nodes_to_pause,
1011 actor_cdc_table_snapshot_splits:
1012 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1013 cdc_table_snapshot_split_assignment.clone(),
1014 )
1015 .into(),
1016 new_upstream_sinks,
1017 };
1018
1019 Some(Mutation::Add(add_mutation))
1020 }
1021 Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
1022 Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1023 info: jobs_to_merge
1024 .iter()
1025 .flat_map(|(job_id, upstream_tables)| {
1026 upstream_tables.iter().map(move |upstream_table_id| {
1027 SubscriptionUpstreamInfo {
1028 subscriber_id: job_id.as_subscriber_id(),
1029 upstream_mv_table_id: *upstream_table_id,
1030 }
1031 })
1032 })
1033 .collect(),
1034 }))
1035 }
1036
1037 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1038 old_fragments,
1039 replace_upstream,
1040 upstream_fragment_downstreams,
1041 init_split_assignment,
1042 auto_refresh_schema_sinks,
1043 cdc_table_snapshot_split_assignment,
1044 ..
1045 }) => {
1046 let edges = edges.as_mut().expect("should exist");
1047 let merge_updates = edges
1048 .merge_updates
1049 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1050 .collect();
1051 let dispatchers = edges
1052 .dispatchers
1053 .extract_if(|fragment_id, _| {
1054 upstream_fragment_downstreams.contains_key(fragment_id)
1055 })
1056 .collect();
1057 let cdc_table_snapshot_split_assignment =
1058 if cdc_table_snapshot_split_assignment.is_empty() {
1059 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
1060 } else {
1061 CdcTableSnapshotSplitAssignmentWithGeneration::new(
1062 cdc_table_snapshot_split_assignment.clone(),
1063 control_stream_manager
1064 .env
1065 .cdc_table_backfill_tracker
1066 .next_generation(iter::once(old_fragments.stream_job_id)),
1067 )
1068 };
1069 Self::generate_update_mutation_for_replace_table(
1070 old_fragments.actor_ids().chain(
1071 auto_refresh_schema_sinks
1072 .as_ref()
1073 .into_iter()
1074 .flat_map(|sinks| {
1075 sinks.iter().flat_map(|sink| {
1076 sink.original_fragment
1077 .actors
1078 .iter()
1079 .map(|actor| actor.actor_id)
1080 })
1081 }),
1082 ),
1083 merge_updates,
1084 dispatchers,
1085 init_split_assignment,
1086 cdc_table_snapshot_split_assignment,
1087 auto_refresh_schema_sinks.as_ref(),
1088 )
1089 }
1090
1091 Command::RescheduleFragment {
1092 reschedules,
1093 fragment_actors,
1094 ..
1095 } => {
1096 let mut dispatcher_update = HashMap::new();
1097 for reschedule in reschedules.values() {
1098 for &(upstream_fragment_id, dispatcher_id) in
1099 &reschedule.upstream_fragment_dispatcher_ids
1100 {
1101 let upstream_actor_ids = fragment_actors
1103 .get(&upstream_fragment_id)
1104 .expect("should contain");
1105
1106 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1107
1108 for &actor_id in upstream_actor_ids {
1110 let added_downstream_actor_id = if upstream_reschedule
1111 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1112 .unwrap_or(true)
1113 {
1114 reschedule
1115 .added_actors
1116 .values()
1117 .flatten()
1118 .cloned()
1119 .collect()
1120 } else {
1121 Default::default()
1122 };
1123 dispatcher_update
1125 .try_insert(
1126 (actor_id, dispatcher_id),
1127 DispatcherUpdate {
1128 actor_id,
1129 dispatcher_id,
1130 hash_mapping: reschedule
1131 .upstream_dispatcher_mapping
1132 .as_ref()
1133 .map(|m| m.to_protobuf()),
1134 added_downstream_actor_id,
1135 removed_downstream_actor_id: reschedule
1136 .removed_actors
1137 .iter()
1138 .cloned()
1139 .collect(),
1140 },
1141 )
1142 .unwrap();
1143 }
1144 }
1145 }
1146 let dispatcher_update = dispatcher_update.into_values().collect();
1147
1148 let mut merge_update = HashMap::new();
1149 for (&fragment_id, reschedule) in reschedules {
1150 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1151 let downstream_actor_ids = fragment_actors
1153 .get(&downstream_fragment_id)
1154 .expect("should contain");
1155
1156 let downstream_removed_actors: HashSet<_> = reschedules
1160 .get(&downstream_fragment_id)
1161 .map(|downstream_reschedule| {
1162 downstream_reschedule
1163 .removed_actors
1164 .iter()
1165 .copied()
1166 .collect()
1167 })
1168 .unwrap_or_default();
1169
1170 for &actor_id in downstream_actor_ids {
1172 if downstream_removed_actors.contains(&actor_id) {
1173 continue;
1174 }
1175
1176 merge_update
1178 .try_insert(
1179 (actor_id, fragment_id),
1180 MergeUpdate {
1181 actor_id,
1182 upstream_fragment_id: fragment_id,
1183 new_upstream_fragment_id: None,
1184 added_upstream_actors: reschedule
1185 .added_actors
1186 .iter()
1187 .flat_map(|(worker_id, actors)| {
1188 let host =
1189 control_stream_manager.host_addr(*worker_id);
1190 actors.iter().map(move |&actor_id| PbActorInfo {
1191 actor_id,
1192 host: Some(host.clone()),
1193 })
1194 })
1195 .collect(),
1196 removed_upstream_actor_id: reschedule
1197 .removed_actors
1198 .iter()
1199 .cloned()
1200 .collect(),
1201 },
1202 )
1203 .unwrap();
1204 }
1205 }
1206 }
1207 let merge_update = merge_update.into_values().collect();
1208
1209 let mut actor_vnode_bitmap_update = HashMap::new();
1210 for reschedule in reschedules.values() {
1211 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1213 let bitmap = bitmap.to_protobuf();
1214 actor_vnode_bitmap_update
1215 .try_insert(actor_id, bitmap)
1216 .unwrap();
1217 }
1218 }
1219 let dropped_actors = reschedules
1220 .values()
1221 .flat_map(|r| r.removed_actors.iter().copied())
1222 .collect();
1223 let mut actor_splits = HashMap::new();
1224 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1225 let mut cdc_table_job_ids: HashSet<_> = HashSet::default();
1226 for reschedule in reschedules.values() {
1227 for (actor_id, splits) in &reschedule.actor_splits {
1228 actor_splits.insert(
1229 *actor_id,
1230 ConnectorSplits {
1231 splits: splits.iter().map(ConnectorSplit::from).collect(),
1232 },
1233 );
1234 }
1235 actor_cdc_table_snapshot_splits.extend(
1236 build_pb_actor_cdc_table_snapshot_splits(
1237 reschedule.cdc_table_snapshot_split_assignment.clone(),
1238 ),
1239 );
1240 if let Some(cdc_table_job_id) = reschedule.cdc_table_job_id {
1241 cdc_table_job_ids.insert(cdc_table_job_id);
1242 }
1243 }
1244
1245 let actor_new_dispatchers = HashMap::new();
1247 let actor_cdc_table_snapshot_splits = if actor_cdc_table_snapshot_splits.is_empty()
1248 {
1249 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1250 CdcTableSnapshotSplitAssignmentWithGeneration::empty(),
1251 )
1252 .into()
1253 } else {
1254 PbCdcTableSnapshotSplitsWithGeneration {
1255 splits: actor_cdc_table_snapshot_splits,
1256 generation: control_stream_manager
1257 .env
1258 .cdc_table_backfill_tracker
1259 .next_generation(cdc_table_job_ids.into_iter()),
1260 }
1261 .into()
1262 };
1263 let mutation = Mutation::Update(UpdateMutation {
1264 dispatcher_update,
1265 merge_update,
1266 actor_vnode_bitmap_update,
1267 dropped_actors,
1268 actor_splits,
1269 actor_new_dispatchers,
1270 actor_cdc_table_snapshot_splits,
1271 sink_add_columns: Default::default(),
1272 });
1273 tracing::debug!("update mutation: {mutation:?}");
1274 Some(mutation)
1275 }
1276
1277 Command::CreateSubscription {
1278 upstream_mv_table_id,
1279 subscription_id,
1280 ..
1281 } => Some(Mutation::Add(AddMutation {
1282 actor_dispatchers: Default::default(),
1283 added_actors: vec![],
1284 actor_splits: Default::default(),
1285 pause: false,
1286 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1287 upstream_mv_table_id: *upstream_mv_table_id,
1288 subscriber_id: subscription_id.as_subscriber_id(),
1289 }],
1290 backfill_nodes_to_pause: vec![],
1291 actor_cdc_table_snapshot_splits: Default::default(),
1292 new_upstream_sinks: Default::default(),
1293 })),
1294 Command::DropSubscription {
1295 upstream_mv_table_id,
1296 subscription_id,
1297 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1298 info: vec![SubscriptionUpstreamInfo {
1299 subscriber_id: subscription_id.as_subscriber_id(),
1300 upstream_mv_table_id: *upstream_mv_table_id,
1301 }],
1302 })),
1303 Command::ConnectorPropsChange(config) => {
1304 let mut connector_props_infos = HashMap::default();
1305 for (k, v) in config {
1306 connector_props_infos.insert(
1307 k.as_raw_id(),
1308 ConnectorPropsInfo {
1309 connector_props_info: v.clone(),
1310 },
1311 );
1312 }
1313 Some(Mutation::ConnectorPropsChange(
1314 ConnectorPropsChangeMutation {
1315 connector_props_infos,
1316 },
1317 ))
1318 }
1319 Command::StartFragmentBackfill { fragment_ids } => Some(
1320 Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1321 fragment_ids: fragment_ids.clone(),
1322 }),
1323 ),
1324 Command::Refresh {
1325 table_id,
1326 associated_source_id,
1327 } => Some(Mutation::RefreshStart(
1328 risingwave_pb::stream_plan::RefreshStartMutation {
1329 table_id: *table_id,
1330 associated_source_id: *associated_source_id,
1331 },
1332 )),
1333 Command::ListFinish {
1334 table_id: _,
1335 associated_source_id,
1336 } => Some(Mutation::ListFinish(ListFinishMutation {
1337 associated_source_id: *associated_source_id,
1338 })),
1339 Command::LoadFinish {
1340 table_id: _,
1341 associated_source_id,
1342 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1343 associated_source_id: *associated_source_id,
1344 })),
1345 }
1346 }
1347
1348 pub(super) fn actors_to_create(
1349 &self,
1350 database_info: &InflightDatabaseInfo,
1351 edges: &mut Option<FragmentEdgeBuildResult>,
1352 control_stream_manager: &ControlStreamManager,
1353 ) -> Option<StreamJobActorsToCreate> {
1354 match self {
1355 Command::CreateStreamingJob { info, job_type, .. } => {
1356 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1357 return None;
1359 }
1360 let actors_to_create = info.stream_job_fragments.actors_to_create();
1361 let edges = edges.as_mut().expect("should exist");
1362 Some(edges.collect_actors_to_create(actors_to_create.map(
1363 |(fragment_id, node, actors)| {
1364 (
1365 fragment_id,
1366 node,
1367 actors,
1368 [], )
1370 },
1371 )))
1372 }
1373 Command::RescheduleFragment {
1374 reschedules,
1375 fragment_actors,
1376 ..
1377 } => {
1378 let mut actor_upstreams = Self::collect_actor_upstreams(
1379 reschedules.iter().map(|(fragment_id, reschedule)| {
1380 (
1381 *fragment_id,
1382 reschedule.newly_created_actors.values().map(
1383 |((actor, dispatchers), _)| {
1384 (actor.actor_id, dispatchers.as_slice())
1385 },
1386 ),
1387 )
1388 }),
1389 Some((reschedules, fragment_actors)),
1390 database_info,
1391 control_stream_manager,
1392 );
1393 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1394 for (fragment_id, (actor, dispatchers), worker_id) in
1395 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1396 reschedule
1397 .newly_created_actors
1398 .values()
1399 .map(|(actors, status)| (*fragment_id, actors, status))
1400 })
1401 {
1402 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1403 map.entry(*worker_id)
1404 .or_default()
1405 .entry(fragment_id)
1406 .or_insert_with(|| {
1407 let node = database_info.fragment(fragment_id).nodes.clone();
1408 let subscribers =
1409 database_info.fragment_subscribers(fragment_id).collect();
1410 (node, vec![], subscribers)
1411 })
1412 .1
1413 .push((actor.clone(), upstreams, dispatchers.clone()));
1414 }
1415 Some(map)
1416 }
1417 Command::ReplaceStreamJob(replace_table) => {
1418 let edges = edges.as_mut().expect("should exist");
1419 let mut actors = edges.collect_actors_to_create(
1420 replace_table.new_fragments.actors_to_create().map(
1421 |(fragment_id, node, actors)| {
1422 (
1423 fragment_id,
1424 node,
1425 actors,
1426 database_info
1427 .job_subscribers(replace_table.old_fragments.stream_job_id),
1428 )
1429 },
1430 ),
1431 );
1432 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1433 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1434 (
1435 sink.new_fragment.fragment_id,
1436 &sink.new_fragment.nodes,
1437 sink.new_fragment.actors.iter().map(|actor| {
1438 (
1439 actor,
1440 sink.actor_status[&actor.actor_id]
1441 .location
1442 .as_ref()
1443 .unwrap()
1444 .worker_node_id,
1445 )
1446 }),
1447 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1448 )
1449 }));
1450 for (worker_id, fragment_actors) in sink_actors {
1451 actors.entry(worker_id).or_default().extend(fragment_actors);
1452 }
1453 }
1454 Some(actors)
1455 }
1456 _ => None,
1457 }
1458 }
1459
1460 fn generate_update_mutation_for_replace_table(
1461 dropped_actors: impl IntoIterator<Item = ActorId>,
1462 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1463 dispatchers: FragmentActorDispatchers,
1464 init_split_assignment: &SplitAssignment,
1465 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
1466 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1467 ) -> Option<Mutation> {
1468 let dropped_actors = dropped_actors.into_iter().collect();
1469
1470 let actor_new_dispatchers = dispatchers
1471 .into_values()
1472 .flatten()
1473 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1474 .collect();
1475
1476 let actor_splits = init_split_assignment
1477 .values()
1478 .flat_map(build_actor_connector_splits)
1479 .collect();
1480 Some(Mutation::Update(UpdateMutation {
1481 actor_new_dispatchers,
1482 merge_update: merge_updates.into_values().flatten().collect(),
1483 dropped_actors,
1484 actor_splits,
1485 actor_cdc_table_snapshot_splits:
1486 build_pb_actor_cdc_table_snapshot_splits_with_generation(
1487 cdc_table_snapshot_split_assignment,
1488 )
1489 .into(),
1490 sink_add_columns: auto_refresh_schema_sinks
1491 .as_ref()
1492 .into_iter()
1493 .flat_map(|sinks| {
1494 sinks.iter().map(|sink| {
1495 (
1496 sink.original_sink.id,
1497 PbSinkAddColumns {
1498 fields: sink
1499 .newly_add_fields
1500 .iter()
1501 .map(|field| field.to_prost())
1502 .collect(),
1503 },
1504 )
1505 })
1506 })
1507 .collect(),
1508 ..Default::default()
1509 }))
1510 }
1511
1512 pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_ {
1514 match self {
1515 Command::DropStreamingJobs {
1516 streaming_job_ids, ..
1517 } => Some(streaming_job_ids.iter().cloned()),
1518 _ => None,
1519 }
1520 .into_iter()
1521 .flatten()
1522 }
1523}
1524
1525impl Command {
1526 #[expect(clippy::type_complexity)]
1527 pub(super) fn collect_actor_upstreams(
1528 actor_dispatchers: impl Iterator<
1529 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1530 >,
1531 reschedule_dispatcher_update: Option<(
1532 &HashMap<FragmentId, Reschedule>,
1533 &HashMap<FragmentId, HashSet<ActorId>>,
1534 )>,
1535 database_info: &InflightDatabaseInfo,
1536 control_stream_manager: &ControlStreamManager,
1537 ) -> HashMap<ActorId, ActorUpstreams> {
1538 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1539 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1540 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1541 for (upstream_actor_id, dispatchers) in upstream_actors {
1542 let upstream_actor_location =
1543 upstream_fragment.actors[&upstream_actor_id].worker_id;
1544 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1545 for downstream_actor_id in dispatchers
1546 .iter()
1547 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1548 {
1549 actor_upstreams
1550 .entry(*downstream_actor_id)
1551 .or_default()
1552 .entry(upstream_fragment_id)
1553 .or_default()
1554 .insert(
1555 upstream_actor_id,
1556 PbActorInfo {
1557 actor_id: upstream_actor_id,
1558 host: Some(upstream_actor_host.clone()),
1559 },
1560 );
1561 }
1562 }
1563 }
1564 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1565 for reschedule in reschedules.values() {
1566 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1567 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1568 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1569 for upstream_actor_id in fragment_actors
1570 .get(upstream_fragment_id)
1571 .expect("should exist")
1572 {
1573 let upstream_actor_location =
1574 upstream_fragment.actors[upstream_actor_id].worker_id;
1575 let upstream_actor_host =
1576 control_stream_manager.host_addr(upstream_actor_location);
1577 if let Some(upstream_reschedule) = upstream_reschedule
1578 && upstream_reschedule
1579 .removed_actors
1580 .contains(upstream_actor_id)
1581 {
1582 continue;
1583 }
1584 for (_, downstream_actor_id) in
1585 reschedule
1586 .added_actors
1587 .iter()
1588 .flat_map(|(worker_id, actors)| {
1589 actors.iter().map(|actor| (*worker_id, *actor))
1590 })
1591 {
1592 actor_upstreams
1593 .entry(downstream_actor_id)
1594 .or_default()
1595 .entry(*upstream_fragment_id)
1596 .or_default()
1597 .insert(
1598 *upstream_actor_id,
1599 PbActorInfo {
1600 actor_id: *upstream_actor_id,
1601 host: Some(upstream_actor_host.clone()),
1602 },
1603 );
1604 }
1605 }
1606 }
1607 }
1608 }
1609 actor_upstreams
1610 }
1611}