1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
29use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_meta_model::WorkerId;
32use risingwave_pb::catalog::CreateType;
33use risingwave_pb::catalog::table::PbTableType;
34use risingwave_pb::common::PbActorInfo;
35use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
36use risingwave_pb::plan_common::PbField;
37use risingwave_pb::source::{
38 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
39};
40use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
41use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
42use risingwave_pb::stream_plan::barrier_mutation::Mutation;
43use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
44use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
45use risingwave_pb::stream_plan::stream_node::NodeBody;
46use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
47use risingwave_pb::stream_plan::update_mutation::*;
48use risingwave_pb::stream_plan::{
49 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
50 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
51 PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StopMutation,
52 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
53};
54use risingwave_pb::stream_service::BarrierCompleteResponse;
55use tracing::warn;
56
57use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
58use crate::MetaResult;
59use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
60use crate::barrier::cdc_progress::CdcTableBackfillTracker;
61use crate::barrier::edge_builder::FragmentEdgeBuildResult;
62use crate::barrier::info::BarrierInfo;
63use crate::barrier::rpc::ControlStreamManager;
64use crate::barrier::utils::collect_resp_info;
65use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
71 StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
74use crate::stream::{
75 AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
76 SplitState, UpstreamSinkInfo, build_actor_connector_splits,
77};
78
79#[derive(Debug, Clone)]
82pub struct Reschedule {
83 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
85
86 pub removed_actors: HashSet<ActorId>,
88
89 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
91
92 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
94 pub upstream_dispatcher_mapping: Option<ActorMapping>,
99
100 pub downstream_fragment_ids: Vec<FragmentId>,
102
103 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
107
108 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
109}
110
111#[derive(Debug, Clone)]
118pub struct ReplaceStreamJobPlan {
119 pub old_fragments: StreamJobFragments,
120 pub new_fragments: StreamJobFragmentsToCreate,
121 pub replace_upstream: FragmentReplaceUpstream,
124 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
125 pub init_split_assignment: SplitAssignment,
131 pub streaming_job: StreamingJob,
133 pub tmp_id: JobId,
135 pub to_drop_state_table_ids: Vec<TableId>,
137 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
138}
139
140impl ReplaceStreamJobPlan {
141 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
142 let mut fragment_changes = HashMap::new();
143 for (fragment_id, new_fragment) in self
144 .new_fragments
145 .new_fragment_info(&self.init_split_assignment)
146 {
147 let fragment_change = CommandFragmentChanges::NewFragment {
148 job_id: self.streaming_job.id(),
149 info: new_fragment,
150 };
151 fragment_changes
152 .try_insert(fragment_id, fragment_change)
153 .expect("non-duplicate");
154 }
155 for fragment in self.old_fragments.fragments.values() {
156 fragment_changes
157 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
158 .expect("non-duplicate");
159 }
160 for (fragment_id, replace_map) in &self.replace_upstream {
161 fragment_changes
162 .try_insert(
163 *fragment_id,
164 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
165 )
166 .expect("non-duplicate");
167 }
168 if let Some(sinks) = &self.auto_refresh_schema_sinks {
169 for sink in sinks {
170 let fragment_change = CommandFragmentChanges::NewFragment {
171 job_id: sink.original_sink.id.as_job_id(),
172 info: sink.new_fragment_info(),
173 };
174 fragment_changes
175 .try_insert(sink.new_fragment.fragment_id, fragment_change)
176 .expect("non-duplicate");
177 fragment_changes
178 .try_insert(
179 sink.original_fragment.fragment_id,
180 CommandFragmentChanges::RemoveFragment,
181 )
182 .expect("non-duplicate");
183 }
184 }
185 fragment_changes
186 }
187
188 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
190 let mut fragment_replacements = HashMap::new();
191 for (upstream_fragment_id, new_upstream_fragment_id) in
192 self.replace_upstream.values().flatten()
193 {
194 {
195 let r =
196 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
197 if let Some(r) = r {
198 assert_eq!(
199 *new_upstream_fragment_id, r,
200 "one fragment is replaced by multiple fragments"
201 );
202 }
203 }
204 }
205 fragment_replacements
206 }
207}
208
209#[derive(educe::Educe, Clone)]
210#[educe(Debug)]
211pub struct CreateStreamingJobCommandInfo {
212 #[educe(Debug(ignore))]
213 pub stream_job_fragments: StreamJobFragmentsToCreate,
214 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
215 pub init_split_assignment: SplitAssignment,
216 pub definition: String,
217 pub job_type: StreamingJobType,
218 pub create_type: CreateType,
219 pub streaming_job: StreamingJob,
220 pub fragment_backfill_ordering: FragmentBackfillOrder,
221 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
222 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
223}
224
225impl StreamJobFragments {
226 pub(super) fn new_fragment_info<'a>(
227 &'a self,
228 assignment: &'a SplitAssignment,
229 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
230 self.fragments.values().map(|fragment| {
231 let mut fragment_splits = assignment
232 .get(&fragment.fragment_id)
233 .cloned()
234 .unwrap_or_default();
235
236 (
237 fragment.fragment_id,
238 InflightFragmentInfo {
239 fragment_id: fragment.fragment_id,
240 distribution_type: fragment.distribution_type.into(),
241 fragment_type_mask: fragment.fragment_type_mask,
242 vnode_count: fragment.vnode_count(),
243 nodes: fragment.nodes.clone(),
244 actors: fragment
245 .actors
246 .iter()
247 .map(|actor| {
248 (
249 actor.actor_id,
250 InflightActorInfo {
251 worker_id: self
252 .actor_status
253 .get(&actor.actor_id)
254 .expect("should exist")
255 .worker_id(),
256 vnode_bitmap: actor.vnode_bitmap.clone(),
257 splits: fragment_splits
258 .remove(&actor.actor_id)
259 .unwrap_or_default(),
260 },
261 )
262 })
263 .collect(),
264 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
265 },
266 )
267 })
268 }
269}
270
271#[derive(Debug, Clone)]
272pub struct SnapshotBackfillInfo {
273 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
277}
278
279#[derive(Debug, Clone)]
280pub enum CreateStreamingJobType {
281 Normal,
282 SinkIntoTable(UpstreamSinkInfo),
283 SnapshotBackfill(SnapshotBackfillInfo),
284}
285
286#[derive(Debug)]
291pub enum Command {
292 Flush,
295
296 Pause,
299
300 Resume,
304
305 DropStreamingJobs {
313 streaming_job_ids: HashSet<JobId>,
314 actors: Vec<ActorId>,
315 unregistered_state_table_ids: HashSet<TableId>,
316 unregistered_fragment_ids: HashSet<FragmentId>,
317 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
319 },
320
321 CreateStreamingJob {
331 info: CreateStreamingJobCommandInfo,
332 job_type: CreateStreamingJobType,
333 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
334 },
335
336 RescheduleFragment {
342 reschedules: HashMap<FragmentId, Reschedule>,
343 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
345 },
346
347 ReplaceStreamJob(ReplaceStreamJobPlan),
354
355 SourceChangeSplit(SplitState),
358
359 Throttle {
362 jobs: HashSet<JobId>,
363 config: HashMap<FragmentId, Option<u32>>,
364 },
365
366 CreateSubscription {
369 subscription_id: SubscriptionId,
370 upstream_mv_table_id: TableId,
371 retention_second: u64,
372 },
373
374 DropSubscription {
378 subscription_id: SubscriptionId,
379 upstream_mv_table_id: TableId,
380 },
381
382 ConnectorPropsChange(ConnectorPropsChange),
383
384 Refresh {
387 table_id: TableId,
388 associated_source_id: SourceId,
389 },
390 ListFinish {
391 table_id: TableId,
392 associated_source_id: SourceId,
393 },
394 LoadFinish {
395 table_id: TableId,
396 associated_source_id: SourceId,
397 },
398
399 ResetSource {
402 source_id: SourceId,
403 },
404}
405
406impl std::fmt::Display for Command {
408 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
409 match self {
410 Command::Flush => write!(f, "Flush"),
411 Command::Pause => write!(f, "Pause"),
412 Command::Resume => write!(f, "Resume"),
413 Command::DropStreamingJobs {
414 streaming_job_ids, ..
415 } => {
416 write!(
417 f,
418 "DropStreamingJobs: {}",
419 streaming_job_ids.iter().sorted().join(", ")
420 )
421 }
422 Command::CreateStreamingJob { info, .. } => {
423 write!(f, "CreateStreamingJob: {}", info.streaming_job)
424 }
425 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
426 Command::ReplaceStreamJob(plan) => {
427 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
428 }
429 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
430 Command::Throttle { .. } => write!(f, "Throttle"),
431 Command::CreateSubscription {
432 subscription_id, ..
433 } => write!(f, "CreateSubscription: {subscription_id}"),
434 Command::DropSubscription {
435 subscription_id, ..
436 } => write!(f, "DropSubscription: {subscription_id}"),
437 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
438 Command::Refresh {
439 table_id,
440 associated_source_id,
441 } => write!(
442 f,
443 "Refresh: {} (source: {})",
444 table_id, associated_source_id
445 ),
446 Command::ListFinish {
447 table_id,
448 associated_source_id,
449 } => write!(
450 f,
451 "ListFinish: {} (source: {})",
452 table_id, associated_source_id
453 ),
454 Command::LoadFinish {
455 table_id,
456 associated_source_id,
457 } => write!(
458 f,
459 "LoadFinish: {} (source: {})",
460 table_id, associated_source_id
461 ),
462 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
463 }
464 }
465}
466
467impl Command {
468 pub fn pause() -> Self {
469 Self::Pause
470 }
471
472 pub fn resume() -> Self {
473 Self::Resume
474 }
475
476 #[expect(clippy::type_complexity)]
477 pub(super) fn fragment_changes(
478 &self,
479 ) -> Option<(
480 Option<(JobId, Option<CdcTableBackfillTracker>)>,
481 HashMap<FragmentId, CommandFragmentChanges>,
482 )> {
483 match self {
484 Command::Flush => None,
485 Command::Pause => None,
486 Command::Resume => None,
487 Command::DropStreamingJobs {
488 unregistered_fragment_ids,
489 dropped_sink_fragment_by_targets,
490 ..
491 } => {
492 let changes = unregistered_fragment_ids
493 .iter()
494 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
495 .chain(dropped_sink_fragment_by_targets.iter().map(
496 |(target_fragment, sink_fragments)| {
497 (
498 *target_fragment,
499 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
500 )
501 },
502 ))
503 .collect();
504
505 Some((None, changes))
506 }
507 Command::CreateStreamingJob { info, job_type, .. } => {
508 assert!(
509 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
510 "should handle fragment changes separately for snapshot backfill"
511 );
512 let mut changes: HashMap<_, _> = info
513 .stream_job_fragments
514 .new_fragment_info(&info.init_split_assignment)
515 .map(|(fragment_id, fragment_infos)| {
516 (
517 fragment_id,
518 CommandFragmentChanges::NewFragment {
519 job_id: info.streaming_job.id(),
520 info: fragment_infos,
521 },
522 )
523 })
524 .collect();
525
526 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
527 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
528 changes.insert(
529 downstream_fragment_id,
530 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
531 upstream_fragment_id: ctx.sink_fragment_id,
532 sink_output_schema: ctx.sink_output_fields.clone(),
533 project_exprs: ctx.project_exprs.clone(),
534 }),
535 );
536 }
537
538 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
539 let (fragment, _) =
540 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
541 .expect("should have parallel cdc fragment");
542 Some(CdcTableBackfillTracker::new(
543 fragment.fragment_id,
544 splits.clone(),
545 ))
546 } else {
547 None
548 };
549
550 Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
551 }
552 Command::RescheduleFragment { reschedules, .. } => Some((
553 None,
554 reschedules
555 .iter()
556 .map(|(fragment_id, reschedule)| {
557 (
558 *fragment_id,
559 CommandFragmentChanges::Reschedule {
560 new_actors: reschedule
561 .added_actors
562 .iter()
563 .flat_map(|(node_id, actors)| {
564 actors.iter().map(|actor_id| {
565 (
566 *actor_id,
567 InflightActorInfo {
568 worker_id: *node_id,
569 vnode_bitmap: reschedule
570 .newly_created_actors
571 .get(actor_id)
572 .expect("should exist")
573 .0
574 .0
575 .vnode_bitmap
576 .clone(),
577 splits: reschedule
578 .actor_splits
579 .get(actor_id)
580 .cloned()
581 .unwrap_or_default(),
582 },
583 )
584 })
585 })
586 .collect(),
587 actor_update_vnode_bitmap: reschedule
588 .vnode_bitmap_updates
589 .iter()
590 .filter(|(actor_id, _)| {
591 !reschedule.newly_created_actors.contains_key(actor_id)
593 })
594 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
595 .collect(),
596 to_remove: reschedule.removed_actors.iter().cloned().collect(),
597 actor_splits: reschedule.actor_splits.clone(),
598 },
599 )
600 })
601 .collect(),
602 )),
603 Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
604 Command::SourceChangeSplit(SplitState {
605 split_assignment, ..
606 }) => Some((
607 None,
608 split_assignment
609 .iter()
610 .map(|(&fragment_id, splits)| {
611 (
612 fragment_id,
613 CommandFragmentChanges::SplitAssignment {
614 actor_splits: splits.clone(),
615 },
616 )
617 })
618 .collect(),
619 )),
620 Command::Throttle { .. } => None,
621 Command::CreateSubscription { .. } => None,
622 Command::DropSubscription { .. } => None,
623 Command::ConnectorPropsChange(_) => None,
624 Command::Refresh { .. } => None, Command::ListFinish { .. } => None, Command::LoadFinish { .. } => None, Command::ResetSource { .. } => None, }
629 }
630
631 pub fn need_checkpoint(&self) -> bool {
632 !matches!(self, Command::Resume)
634 }
635}
636
637#[derive(Debug, Clone)]
638pub enum BarrierKind {
639 Initial,
640 Barrier,
641 Checkpoint(Vec<u64>),
643}
644
645impl BarrierKind {
646 pub fn to_protobuf(&self) -> PbBarrierKind {
647 match self {
648 BarrierKind::Initial => PbBarrierKind::Initial,
649 BarrierKind::Barrier => PbBarrierKind::Barrier,
650 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
651 }
652 }
653
654 pub fn is_checkpoint(&self) -> bool {
655 matches!(self, BarrierKind::Checkpoint(_))
656 }
657
658 pub fn is_initial(&self) -> bool {
659 matches!(self, BarrierKind::Initial)
660 }
661
662 pub fn as_str_name(&self) -> &'static str {
663 match self {
664 BarrierKind::Initial => "Initial",
665 BarrierKind::Barrier => "Barrier",
666 BarrierKind::Checkpoint(_) => "Checkpoint",
667 }
668 }
669}
670
671pub(super) struct CommandContext {
674 mv_subscription_max_retention: HashMap<TableId, u64>,
675
676 pub(super) barrier_info: BarrierInfo,
677
678 pub(super) table_ids_to_commit: HashSet<TableId>,
679
680 pub(super) command: Option<Command>,
681
682 _span: tracing::Span,
688}
689
690impl std::fmt::Debug for CommandContext {
691 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
692 f.debug_struct("CommandContext")
693 .field("barrier_info", &self.barrier_info)
694 .field("command", &self.command)
695 .finish()
696 }
697}
698
699impl std::fmt::Display for CommandContext {
700 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
701 write!(
702 f,
703 "prev_epoch={}, curr_epoch={}, kind={}",
704 self.barrier_info.prev_epoch(),
705 self.barrier_info.curr_epoch(),
706 self.barrier_info.kind.as_str_name()
707 )?;
708 if let Some(command) = &self.command {
709 write!(f, ", command={}", command)?;
710 }
711 Ok(())
712 }
713}
714
715impl CommandContext {
716 pub(super) fn new(
717 barrier_info: BarrierInfo,
718 mv_subscription_max_retention: HashMap<TableId, u64>,
719 table_ids_to_commit: HashSet<TableId>,
720 command: Option<Command>,
721 span: tracing::Span,
722 ) -> Self {
723 Self {
724 mv_subscription_max_retention,
725 barrier_info,
726 table_ids_to_commit,
727 command,
728 _span: span,
729 }
730 }
731
732 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
733 let Some(truncate_timestamptz) = Timestamptz::from_secs(
734 self.barrier_info
735 .prev_epoch
736 .value()
737 .as_timestamptz()
738 .timestamp()
739 - retention_second as i64,
740 ) else {
741 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
742 return self.barrier_info.prev_epoch.value();
743 };
744 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
745 }
746
747 pub(super) fn collect_commit_epoch_info(
748 &self,
749 info: &mut CommitEpochInfo,
750 resps: Vec<BarrierCompleteResponse>,
751 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
752 ) {
753 let (
754 sst_to_context,
755 synced_ssts,
756 new_table_watermarks,
757 old_value_ssts,
758 vector_index_adds,
759 truncate_tables,
760 ) = collect_resp_info(resps);
761
762 let new_table_fragment_infos =
763 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
764 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
765 {
766 let table_fragments = &info.stream_job_fragments;
767 let mut table_ids: HashSet<_> =
768 table_fragments.internal_table_ids().into_iter().collect();
769 if let Some(mv_table_id) = table_fragments.mv_table_id() {
770 table_ids.insert(mv_table_id);
771 }
772
773 vec![NewTableFragmentInfo { table_ids }]
774 } else {
775 vec![]
776 };
777
778 let mut mv_log_store_truncate_epoch = HashMap::new();
779 let mut update_truncate_epoch =
781 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
782 Entry::Occupied(mut entry) => {
783 let prev_truncate_epoch = entry.get_mut();
784 if truncate_epoch < *prev_truncate_epoch {
785 *prev_truncate_epoch = truncate_epoch;
786 }
787 }
788 Entry::Vacant(entry) => {
789 entry.insert(truncate_epoch);
790 }
791 };
792 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
793 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
794 update_truncate_epoch(*mv_table_id, truncate_epoch);
795 }
796 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
797 for mv_table_id in upstream_mv_table_ids {
798 update_truncate_epoch(mv_table_id, backfill_epoch);
799 }
800 }
801
802 let table_new_change_log = build_table_change_log_delta(
803 old_value_ssts.into_iter(),
804 synced_ssts.iter().map(|sst| &sst.sst_info),
805 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
806 mv_log_store_truncate_epoch.into_iter(),
807 );
808
809 let epoch = self.barrier_info.prev_epoch();
810 for table_id in &self.table_ids_to_commit {
811 info.tables_to_commit
812 .try_insert(*table_id, epoch)
813 .expect("non duplicate");
814 }
815
816 info.sstables.extend(synced_ssts);
817 info.new_table_watermarks.extend(new_table_watermarks);
818 info.sst_to_context.extend(sst_to_context);
819 info.new_table_fragment_infos
820 .extend(new_table_fragment_infos);
821 info.change_log_delta.extend(table_new_change_log);
822 for (table_id, vector_index_adds) in vector_index_adds {
823 info.vector_index_delta
824 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
825 .expect("non-duplicate");
826 }
827 if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
828 for fragment in job_info.stream_job_fragments.fragments.values() {
829 visit_stream_node_cont(&fragment.nodes, |node| {
830 match node.node_body.as_ref().unwrap() {
831 NodeBody::VectorIndexWrite(vector_index_write) => {
832 let index_table = vector_index_write.table.as_ref().unwrap();
833 assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
834 info.vector_index_delta
835 .try_insert(
836 index_table.id,
837 VectorIndexDelta::Init(PbVectorIndexInit {
838 info: Some(index_table.vector_index_info.unwrap()),
839 }),
840 )
841 .expect("non-duplicate");
842 false
843 }
844 _ => true,
845 }
846 })
847 }
848 }
849 info.truncate_tables.extend(truncate_tables);
850 }
851}
852
853impl Command {
854 pub(super) fn to_mutation(
858 &self,
859 is_currently_paused: bool,
860 edges: &mut Option<FragmentEdgeBuildResult>,
861 control_stream_manager: &ControlStreamManager,
862 database_info: &mut InflightDatabaseInfo,
863 ) -> MetaResult<Option<Mutation>> {
864 let mutation = match self {
865 Command::Flush => None,
866
867 Command::Pause => {
868 if !is_currently_paused {
871 Some(Mutation::Pause(PauseMutation {}))
872 } else {
873 None
874 }
875 }
876
877 Command::Resume => {
878 if is_currently_paused {
880 Some(Mutation::Resume(ResumeMutation {}))
881 } else {
882 None
883 }
884 }
885
886 Command::SourceChangeSplit(SplitState {
887 split_assignment, ..
888 }) => {
889 let mut diff = HashMap::new();
890
891 for actor_splits in split_assignment.values() {
892 diff.extend(actor_splits.clone());
893 }
894
895 Some(Mutation::Splits(SourceChangeSplitMutation {
896 actor_splits: build_actor_connector_splits(&diff),
897 }))
898 }
899
900 Command::Throttle { config, .. } => {
901 let mut fragment_to_apply = HashMap::new();
902 for (fragment_id, limit) in config {
903 fragment_to_apply.insert(*fragment_id, RateLimit { rate_limit: *limit });
904 }
905
906 Some(Mutation::Throttle(ThrottleMutation {
907 fragment_throttle: fragment_to_apply,
908 }))
909 }
910
911 Command::DropStreamingJobs {
912 actors,
913 dropped_sink_fragment_by_targets,
914 ..
915 } => Some(Mutation::Stop(StopMutation {
916 actors: actors.clone(),
917 dropped_sink_fragments: dropped_sink_fragment_by_targets
918 .values()
919 .flatten()
920 .cloned()
921 .collect(),
922 })),
923
924 Command::CreateStreamingJob {
925 info:
926 CreateStreamingJobCommandInfo {
927 stream_job_fragments,
928 init_split_assignment: split_assignment,
929 upstream_fragment_downstreams,
930 fragment_backfill_ordering,
931 ..
932 },
933 job_type,
934 ..
935 } => {
936 let edges = edges.as_mut().expect("should exist");
937 let added_actors = stream_job_fragments.actor_ids().collect();
938 let actor_splits = split_assignment
939 .values()
940 .flat_map(build_actor_connector_splits)
941 .collect();
942 let subscriptions_to_add =
943 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
944 job_type
945 {
946 snapshot_backfill_info
947 .upstream_mv_table_id_to_backfill_epoch
948 .keys()
949 .map(|table_id| SubscriptionUpstreamInfo {
950 subscriber_id: stream_job_fragments
951 .stream_job_id()
952 .as_subscriber_id(),
953 upstream_mv_table_id: *table_id,
954 })
955 .collect()
956 } else {
957 Default::default()
958 };
959 let backfill_nodes_to_pause: Vec<_> =
960 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
961 .into_iter()
962 .collect();
963
964 let new_upstream_sinks =
965 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
966 sink_fragment_id,
967 sink_output_fields,
968 project_exprs,
969 new_sink_downstream,
970 ..
971 }) = job_type
972 {
973 let new_sink_actors = stream_job_fragments
974 .actors_to_create()
975 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
976 .exactly_one()
977 .map(|(_, _, actors)| {
978 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
979 actor_id: actor.actor_id,
980 host: Some(control_stream_manager.host_addr(worker_id)),
981 })
982 })
983 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
984 let new_upstream_sink = PbNewUpstreamSink {
985 info: Some(PbUpstreamSinkInfo {
986 upstream_fragment_id: *sink_fragment_id,
987 sink_output_schema: sink_output_fields.clone(),
988 project_exprs: project_exprs.clone(),
989 }),
990 upstream_actors: new_sink_actors.collect(),
991 };
992 HashMap::from([(
993 new_sink_downstream.downstream_fragment_id,
994 new_upstream_sink,
995 )])
996 } else {
997 HashMap::new()
998 };
999
1000 let actor_cdc_table_snapshot_splits =
1001 if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
1002 database_info
1003 .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
1004 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
1005 } else {
1006 None
1007 };
1008
1009 let add_mutation = AddMutation {
1010 actor_dispatchers: edges
1011 .dispatchers
1012 .extract_if(|fragment_id, _| {
1013 upstream_fragment_downstreams.contains_key(fragment_id)
1014 })
1015 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1016 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1017 .collect(),
1018 added_actors,
1019 actor_splits,
1020 pause: is_currently_paused,
1022 subscriptions_to_add,
1023 backfill_nodes_to_pause,
1024 actor_cdc_table_snapshot_splits,
1025 new_upstream_sinks,
1026 };
1027
1028 Some(Mutation::Add(add_mutation))
1029 }
1030
1031 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1032 old_fragments,
1033 replace_upstream,
1034 upstream_fragment_downstreams,
1035 init_split_assignment,
1036 auto_refresh_schema_sinks,
1037 ..
1038 }) => {
1039 let edges = edges.as_mut().expect("should exist");
1040 let merge_updates = edges
1041 .merge_updates
1042 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1043 .collect();
1044 let dispatchers = edges
1045 .dispatchers
1046 .extract_if(|fragment_id, _| {
1047 upstream_fragment_downstreams.contains_key(fragment_id)
1048 })
1049 .collect();
1050 let actor_cdc_table_snapshot_splits = database_info
1051 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1052 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1053 Self::generate_update_mutation_for_replace_table(
1054 old_fragments.actor_ids().chain(
1055 auto_refresh_schema_sinks
1056 .as_ref()
1057 .into_iter()
1058 .flat_map(|sinks| {
1059 sinks.iter().flat_map(|sink| {
1060 sink.original_fragment
1061 .actors
1062 .iter()
1063 .map(|actor| actor.actor_id)
1064 })
1065 }),
1066 ),
1067 merge_updates,
1068 dispatchers,
1069 init_split_assignment,
1070 actor_cdc_table_snapshot_splits,
1071 auto_refresh_schema_sinks.as_ref(),
1072 )
1073 }
1074
1075 Command::RescheduleFragment {
1076 reschedules,
1077 fragment_actors,
1078 ..
1079 } => {
1080 let mut dispatcher_update = HashMap::new();
1081 for reschedule in reschedules.values() {
1082 for &(upstream_fragment_id, dispatcher_id) in
1083 &reschedule.upstream_fragment_dispatcher_ids
1084 {
1085 let upstream_actor_ids = fragment_actors
1087 .get(&upstream_fragment_id)
1088 .expect("should contain");
1089
1090 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1091
1092 for &actor_id in upstream_actor_ids {
1094 let added_downstream_actor_id = if upstream_reschedule
1095 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1096 .unwrap_or(true)
1097 {
1098 reschedule
1099 .added_actors
1100 .values()
1101 .flatten()
1102 .cloned()
1103 .collect()
1104 } else {
1105 Default::default()
1106 };
1107 dispatcher_update
1109 .try_insert(
1110 (actor_id, dispatcher_id),
1111 DispatcherUpdate {
1112 actor_id,
1113 dispatcher_id,
1114 hash_mapping: reschedule
1115 .upstream_dispatcher_mapping
1116 .as_ref()
1117 .map(|m| m.to_protobuf()),
1118 added_downstream_actor_id,
1119 removed_downstream_actor_id: reschedule
1120 .removed_actors
1121 .iter()
1122 .cloned()
1123 .collect(),
1124 },
1125 )
1126 .unwrap();
1127 }
1128 }
1129 }
1130 let dispatcher_update = dispatcher_update.into_values().collect();
1131
1132 let mut merge_update = HashMap::new();
1133 for (&fragment_id, reschedule) in reschedules {
1134 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1135 let downstream_actor_ids = fragment_actors
1137 .get(&downstream_fragment_id)
1138 .expect("should contain");
1139
1140 let downstream_removed_actors: HashSet<_> = reschedules
1144 .get(&downstream_fragment_id)
1145 .map(|downstream_reschedule| {
1146 downstream_reschedule
1147 .removed_actors
1148 .iter()
1149 .copied()
1150 .collect()
1151 })
1152 .unwrap_or_default();
1153
1154 for &actor_id in downstream_actor_ids {
1156 if downstream_removed_actors.contains(&actor_id) {
1157 continue;
1158 }
1159
1160 merge_update
1162 .try_insert(
1163 (actor_id, fragment_id),
1164 MergeUpdate {
1165 actor_id,
1166 upstream_fragment_id: fragment_id,
1167 new_upstream_fragment_id: None,
1168 added_upstream_actors: reschedule
1169 .added_actors
1170 .iter()
1171 .flat_map(|(worker_id, actors)| {
1172 let host =
1173 control_stream_manager.host_addr(*worker_id);
1174 actors.iter().map(move |&actor_id| PbActorInfo {
1175 actor_id,
1176 host: Some(host.clone()),
1177 })
1178 })
1179 .collect(),
1180 removed_upstream_actor_id: reschedule
1181 .removed_actors
1182 .iter()
1183 .cloned()
1184 .collect(),
1185 },
1186 )
1187 .unwrap();
1188 }
1189 }
1190 }
1191 let merge_update = merge_update.into_values().collect();
1192
1193 let mut actor_vnode_bitmap_update = HashMap::new();
1194 for reschedule in reschedules.values() {
1195 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1197 let bitmap = bitmap.to_protobuf();
1198 actor_vnode_bitmap_update
1199 .try_insert(actor_id, bitmap)
1200 .unwrap();
1201 }
1202 }
1203 let dropped_actors = reschedules
1204 .values()
1205 .flat_map(|r| r.removed_actors.iter().copied())
1206 .collect();
1207 let mut actor_splits = HashMap::new();
1208 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1209 for (fragment_id, reschedule) in reschedules {
1210 for (actor_id, splits) in &reschedule.actor_splits {
1211 actor_splits.insert(
1212 *actor_id,
1213 ConnectorSplits {
1214 splits: splits.iter().map(ConnectorSplit::from).collect(),
1215 },
1216 );
1217 }
1218
1219 if let Some(assignment) =
1220 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1221 {
1222 actor_cdc_table_snapshot_splits.extend(assignment)
1223 }
1224 }
1225
1226 let actor_new_dispatchers = HashMap::new();
1228 let mutation = Mutation::Update(UpdateMutation {
1229 dispatcher_update,
1230 merge_update,
1231 actor_vnode_bitmap_update,
1232 dropped_actors,
1233 actor_splits,
1234 actor_new_dispatchers,
1235 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1236 splits: actor_cdc_table_snapshot_splits,
1237 }),
1238 sink_schema_change: Default::default(),
1239 subscriptions_to_drop: vec![],
1240 });
1241 tracing::debug!("update mutation: {mutation:?}");
1242 Some(mutation)
1243 }
1244
1245 Command::CreateSubscription {
1246 upstream_mv_table_id,
1247 subscription_id,
1248 ..
1249 } => Some(Mutation::Add(AddMutation {
1250 actor_dispatchers: Default::default(),
1251 added_actors: vec![],
1252 actor_splits: Default::default(),
1253 pause: false,
1254 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1255 upstream_mv_table_id: *upstream_mv_table_id,
1256 subscriber_id: subscription_id.as_subscriber_id(),
1257 }],
1258 backfill_nodes_to_pause: vec![],
1259 actor_cdc_table_snapshot_splits: None,
1260 new_upstream_sinks: Default::default(),
1261 })),
1262 Command::DropSubscription {
1263 upstream_mv_table_id,
1264 subscription_id,
1265 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1266 info: vec![SubscriptionUpstreamInfo {
1267 subscriber_id: subscription_id.as_subscriber_id(),
1268 upstream_mv_table_id: *upstream_mv_table_id,
1269 }],
1270 })),
1271 Command::ConnectorPropsChange(config) => {
1272 let mut connector_props_infos = HashMap::default();
1273 for (k, v) in config {
1274 connector_props_infos.insert(
1275 k.as_raw_id(),
1276 ConnectorPropsInfo {
1277 connector_props_info: v.clone(),
1278 },
1279 );
1280 }
1281 Some(Mutation::ConnectorPropsChange(
1282 ConnectorPropsChangeMutation {
1283 connector_props_infos,
1284 },
1285 ))
1286 }
1287 Command::Refresh {
1288 table_id,
1289 associated_source_id,
1290 } => Some(Mutation::RefreshStart(
1291 risingwave_pb::stream_plan::RefreshStartMutation {
1292 table_id: *table_id,
1293 associated_source_id: *associated_source_id,
1294 },
1295 )),
1296 Command::ListFinish {
1297 table_id: _,
1298 associated_source_id,
1299 } => Some(Mutation::ListFinish(ListFinishMutation {
1300 associated_source_id: *associated_source_id,
1301 })),
1302 Command::LoadFinish {
1303 table_id: _,
1304 associated_source_id,
1305 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1306 associated_source_id: *associated_source_id,
1307 })),
1308 Command::ResetSource { source_id } => Some(Mutation::ResetSource(
1309 risingwave_pb::stream_plan::ResetSourceMutation {
1310 source_id: source_id.as_raw_id(),
1311 },
1312 )),
1313 };
1314 Ok(mutation)
1315 }
1316
1317 pub(super) fn actors_to_create(
1318 &self,
1319 database_info: &InflightDatabaseInfo,
1320 edges: &mut Option<FragmentEdgeBuildResult>,
1321 control_stream_manager: &ControlStreamManager,
1322 ) -> Option<StreamJobActorsToCreate> {
1323 match self {
1324 Command::CreateStreamingJob { info, job_type, .. } => {
1325 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1326 return None;
1328 }
1329 let actors_to_create = info.stream_job_fragments.actors_to_create();
1330 let edges = edges.as_mut().expect("should exist");
1331 Some(edges.collect_actors_to_create(actors_to_create.map(
1332 |(fragment_id, node, actors)| {
1333 (
1334 fragment_id,
1335 node,
1336 actors,
1337 [], )
1339 },
1340 )))
1341 }
1342 Command::RescheduleFragment {
1343 reschedules,
1344 fragment_actors,
1345 ..
1346 } => {
1347 let mut actor_upstreams = Self::collect_actor_upstreams(
1348 reschedules.iter().map(|(fragment_id, reschedule)| {
1349 (
1350 *fragment_id,
1351 reschedule.newly_created_actors.values().map(
1352 |((actor, dispatchers), _)| {
1353 (actor.actor_id, dispatchers.as_slice())
1354 },
1355 ),
1356 )
1357 }),
1358 Some((reschedules, fragment_actors)),
1359 database_info,
1360 control_stream_manager,
1361 );
1362 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1363 for (fragment_id, (actor, dispatchers), worker_id) in
1364 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1365 reschedule
1366 .newly_created_actors
1367 .values()
1368 .map(|(actors, status)| (*fragment_id, actors, status))
1369 })
1370 {
1371 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1372 map.entry(*worker_id)
1373 .or_default()
1374 .entry(fragment_id)
1375 .or_insert_with(|| {
1376 let node = database_info.fragment(fragment_id).nodes.clone();
1377 let subscribers =
1378 database_info.fragment_subscribers(fragment_id).collect();
1379 (node, vec![], subscribers)
1380 })
1381 .1
1382 .push((actor.clone(), upstreams, dispatchers.clone()));
1383 }
1384 Some(map)
1385 }
1386 Command::ReplaceStreamJob(replace_table) => {
1387 let edges = edges.as_mut().expect("should exist");
1388 let mut actors = edges.collect_actors_to_create(
1389 replace_table.new_fragments.actors_to_create().map(
1390 |(fragment_id, node, actors)| {
1391 (
1392 fragment_id,
1393 node,
1394 actors,
1395 database_info
1396 .job_subscribers(replace_table.old_fragments.stream_job_id),
1397 )
1398 },
1399 ),
1400 );
1401 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1402 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1403 (
1404 sink.new_fragment.fragment_id,
1405 &sink.new_fragment.nodes,
1406 sink.new_fragment.actors.iter().map(|actor| {
1407 (
1408 actor,
1409 sink.actor_status[&actor.actor_id]
1410 .location
1411 .as_ref()
1412 .unwrap()
1413 .worker_node_id,
1414 )
1415 }),
1416 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1417 )
1418 }));
1419 for (worker_id, fragment_actors) in sink_actors {
1420 actors.entry(worker_id).or_default().extend(fragment_actors);
1421 }
1422 }
1423 Some(actors)
1424 }
1425 _ => None,
1426 }
1427 }
1428
1429 fn generate_update_mutation_for_replace_table(
1430 dropped_actors: impl IntoIterator<Item = ActorId>,
1431 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1432 dispatchers: FragmentActorDispatchers,
1433 init_split_assignment: &SplitAssignment,
1434 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1435 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1436 ) -> Option<Mutation> {
1437 let dropped_actors = dropped_actors.into_iter().collect();
1438
1439 let actor_new_dispatchers = dispatchers
1440 .into_values()
1441 .flatten()
1442 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1443 .collect();
1444
1445 let actor_splits = init_split_assignment
1446 .values()
1447 .flat_map(build_actor_connector_splits)
1448 .collect();
1449 Some(Mutation::Update(UpdateMutation {
1450 actor_new_dispatchers,
1451 merge_update: merge_updates.into_values().flatten().collect(),
1452 dropped_actors,
1453 actor_splits,
1454 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1455 sink_schema_change: auto_refresh_schema_sinks
1456 .as_ref()
1457 .into_iter()
1458 .flat_map(|sinks| {
1459 sinks.iter().map(|sink| {
1460 (
1461 sink.original_sink.id.as_raw_id(),
1462 PbSinkSchemaChange {
1463 original_schema: sink
1464 .original_sink
1465 .columns
1466 .iter()
1467 .map(|col| PbField {
1468 data_type: Some(
1469 col.column_desc
1470 .as_ref()
1471 .unwrap()
1472 .column_type
1473 .as_ref()
1474 .unwrap()
1475 .clone(),
1476 ),
1477 name: col.column_desc.as_ref().unwrap().name.clone(),
1478 })
1479 .collect(),
1480 op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1481 fields: sink
1482 .newly_add_fields
1483 .iter()
1484 .map(|field| field.to_prost())
1485 .collect(),
1486 })),
1487 },
1488 )
1489 })
1490 })
1491 .collect(),
1492 ..Default::default()
1493 }))
1494 }
1495
1496 pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_ {
1498 match self {
1499 Command::DropStreamingJobs {
1500 streaming_job_ids, ..
1501 } => Some(streaming_job_ids.iter().cloned()),
1502 _ => None,
1503 }
1504 .into_iter()
1505 .flatten()
1506 }
1507}
1508
1509impl Command {
1510 #[expect(clippy::type_complexity)]
1511 pub(super) fn collect_actor_upstreams(
1512 actor_dispatchers: impl Iterator<
1513 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1514 >,
1515 reschedule_dispatcher_update: Option<(
1516 &HashMap<FragmentId, Reschedule>,
1517 &HashMap<FragmentId, HashSet<ActorId>>,
1518 )>,
1519 database_info: &InflightDatabaseInfo,
1520 control_stream_manager: &ControlStreamManager,
1521 ) -> HashMap<ActorId, ActorUpstreams> {
1522 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1523 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1524 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1525 for (upstream_actor_id, dispatchers) in upstream_actors {
1526 let upstream_actor_location =
1527 upstream_fragment.actors[&upstream_actor_id].worker_id;
1528 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1529 for downstream_actor_id in dispatchers
1530 .iter()
1531 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1532 {
1533 actor_upstreams
1534 .entry(*downstream_actor_id)
1535 .or_default()
1536 .entry(upstream_fragment_id)
1537 .or_default()
1538 .insert(
1539 upstream_actor_id,
1540 PbActorInfo {
1541 actor_id: upstream_actor_id,
1542 host: Some(upstream_actor_host.clone()),
1543 },
1544 );
1545 }
1546 }
1547 }
1548 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1549 for reschedule in reschedules.values() {
1550 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1551 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1552 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1553 for upstream_actor_id in fragment_actors
1554 .get(upstream_fragment_id)
1555 .expect("should exist")
1556 {
1557 let upstream_actor_location =
1558 upstream_fragment.actors[upstream_actor_id].worker_id;
1559 let upstream_actor_host =
1560 control_stream_manager.host_addr(upstream_actor_location);
1561 if let Some(upstream_reschedule) = upstream_reschedule
1562 && upstream_reschedule
1563 .removed_actors
1564 .contains(upstream_actor_id)
1565 {
1566 continue;
1567 }
1568 for (_, downstream_actor_id) in
1569 reschedule
1570 .added_actors
1571 .iter()
1572 .flat_map(|(worker_id, actors)| {
1573 actors.iter().map(|actor| (*worker_id, *actor))
1574 })
1575 {
1576 actor_upstreams
1577 .entry(downstream_actor_id)
1578 .or_default()
1579 .entry(*upstream_fragment_id)
1580 .or_default()
1581 .insert(
1582 *upstream_actor_id,
1583 PbActorInfo {
1584 actor_id: *upstream_actor_id,
1585 host: Some(upstream_actor_host.clone()),
1586 },
1587 );
1588 }
1589 }
1590 }
1591 }
1592 }
1593 actor_upstreams
1594 }
1595}