1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
29use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_meta_model::WorkerId;
32use risingwave_pb::catalog::CreateType;
33use risingwave_pb::catalog::table::PbTableType;
34use risingwave_pb::common::PbActorInfo;
35use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
36use risingwave_pb::plan_common::PbField;
37use risingwave_pb::source::{
38 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
39};
40use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
41use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
42use risingwave_pb::stream_plan::barrier_mutation::Mutation;
43use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
44use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
45use risingwave_pb::stream_plan::stream_node::NodeBody;
46use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
47use risingwave_pb::stream_plan::update_mutation::*;
48use risingwave_pb::stream_plan::{
49 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
50 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
51 PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StopMutation,
52 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
53};
54use risingwave_pb::stream_service::BarrierCompleteResponse;
55use tracing::warn;
56
57use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
58use crate::MetaResult;
59use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
60use crate::barrier::cdc_progress::CdcTableBackfillTracker;
61use crate::barrier::edge_builder::FragmentEdgeBuildResult;
62use crate::barrier::info::BarrierInfo;
63use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
64use crate::barrier::utils::collect_resp_info;
65use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
71 StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
74use crate::stream::{
75 AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
76 SplitState, UpstreamSinkInfo, build_actor_connector_splits,
77};
78
79#[derive(Debug, Clone)]
82pub struct Reschedule {
83 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
85
86 pub removed_actors: HashSet<ActorId>,
88
89 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
91
92 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
94 pub upstream_dispatcher_mapping: Option<ActorMapping>,
99
100 pub downstream_fragment_ids: Vec<FragmentId>,
102
103 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
107
108 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
109}
110
111#[derive(Debug, Clone)]
118pub struct ReplaceStreamJobPlan {
119 pub old_fragments: StreamJobFragments,
120 pub new_fragments: StreamJobFragmentsToCreate,
121 pub replace_upstream: FragmentReplaceUpstream,
124 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
125 pub init_split_assignment: SplitAssignment,
131 pub streaming_job: StreamingJob,
133 pub tmp_id: JobId,
135 pub to_drop_state_table_ids: Vec<TableId>,
137 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
138}
139
140impl ReplaceStreamJobPlan {
141 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
142 let mut fragment_changes = HashMap::new();
143 for (fragment_id, new_fragment) in self
144 .new_fragments
145 .new_fragment_info(&self.init_split_assignment)
146 {
147 let fragment_change = CommandFragmentChanges::NewFragment {
148 job_id: self.streaming_job.id(),
149 info: new_fragment,
150 };
151 fragment_changes
152 .try_insert(fragment_id, fragment_change)
153 .expect("non-duplicate");
154 }
155 for fragment in self.old_fragments.fragments.values() {
156 fragment_changes
157 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
158 .expect("non-duplicate");
159 }
160 for (fragment_id, replace_map) in &self.replace_upstream {
161 fragment_changes
162 .try_insert(
163 *fragment_id,
164 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
165 )
166 .expect("non-duplicate");
167 }
168 if let Some(sinks) = &self.auto_refresh_schema_sinks {
169 for sink in sinks {
170 let fragment_change = CommandFragmentChanges::NewFragment {
171 job_id: sink.original_sink.id.as_job_id(),
172 info: sink.new_fragment_info(),
173 };
174 fragment_changes
175 .try_insert(sink.new_fragment.fragment_id, fragment_change)
176 .expect("non-duplicate");
177 fragment_changes
178 .try_insert(
179 sink.original_fragment.fragment_id,
180 CommandFragmentChanges::RemoveFragment,
181 )
182 .expect("non-duplicate");
183 }
184 }
185 fragment_changes
186 }
187
188 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
190 let mut fragment_replacements = HashMap::new();
191 for (upstream_fragment_id, new_upstream_fragment_id) in
192 self.replace_upstream.values().flatten()
193 {
194 {
195 let r =
196 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
197 if let Some(r) = r {
198 assert_eq!(
199 *new_upstream_fragment_id, r,
200 "one fragment is replaced by multiple fragments"
201 );
202 }
203 }
204 }
205 fragment_replacements
206 }
207}
208
209#[derive(educe::Educe, Clone)]
210#[educe(Debug)]
211pub struct CreateStreamingJobCommandInfo {
212 #[educe(Debug(ignore))]
213 pub stream_job_fragments: StreamJobFragmentsToCreate,
214 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
215 pub init_split_assignment: SplitAssignment,
216 pub definition: String,
217 pub job_type: StreamingJobType,
218 pub create_type: CreateType,
219 pub streaming_job: StreamingJob,
220 pub fragment_backfill_ordering: FragmentBackfillOrder,
221 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
222 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
223 pub is_serverless: bool,
224}
225
226impl StreamJobFragments {
227 pub(super) fn new_fragment_info<'a>(
228 &'a self,
229 assignment: &'a SplitAssignment,
230 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
231 self.fragments.values().map(|fragment| {
232 let mut fragment_splits = assignment
233 .get(&fragment.fragment_id)
234 .cloned()
235 .unwrap_or_default();
236
237 (
238 fragment.fragment_id,
239 InflightFragmentInfo {
240 fragment_id: fragment.fragment_id,
241 distribution_type: fragment.distribution_type.into(),
242 fragment_type_mask: fragment.fragment_type_mask,
243 vnode_count: fragment.vnode_count(),
244 nodes: fragment.nodes.clone(),
245 actors: fragment
246 .actors
247 .iter()
248 .map(|actor| {
249 (
250 actor.actor_id,
251 InflightActorInfo {
252 worker_id: self
253 .actor_status
254 .get(&actor.actor_id)
255 .expect("should exist")
256 .worker_id(),
257 vnode_bitmap: actor.vnode_bitmap.clone(),
258 splits: fragment_splits
259 .remove(&actor.actor_id)
260 .unwrap_or_default(),
261 },
262 )
263 })
264 .collect(),
265 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
266 },
267 )
268 })
269 }
270}
271
272#[derive(Debug, Clone)]
273pub struct SnapshotBackfillInfo {
274 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
278}
279
280#[derive(Debug, Clone)]
281pub enum CreateStreamingJobType {
282 Normal,
283 SinkIntoTable(UpstreamSinkInfo),
284 SnapshotBackfill(SnapshotBackfillInfo),
285}
286
287#[derive(Debug)]
292pub enum Command {
293 Flush,
296
297 Pause,
300
301 Resume,
305
306 DropStreamingJobs {
314 streaming_job_ids: HashSet<JobId>,
315 actors: Vec<ActorId>,
316 unregistered_state_table_ids: HashSet<TableId>,
317 unregistered_fragment_ids: HashSet<FragmentId>,
318 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
320 },
321
322 CreateStreamingJob {
332 info: CreateStreamingJobCommandInfo,
333 job_type: CreateStreamingJobType,
334 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
335 },
336
337 RescheduleFragment {
343 reschedules: HashMap<FragmentId, Reschedule>,
344 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
346 },
347
348 ReplaceStreamJob(ReplaceStreamJobPlan),
355
356 SourceChangeSplit(SplitState),
359
360 Throttle {
363 jobs: HashSet<JobId>,
364 config: HashMap<FragmentId, Option<u32>>,
365 },
366
367 CreateSubscription {
370 subscription_id: SubscriptionId,
371 upstream_mv_table_id: TableId,
372 retention_second: u64,
373 },
374
375 DropSubscription {
379 subscription_id: SubscriptionId,
380 upstream_mv_table_id: TableId,
381 },
382
383 ConnectorPropsChange(ConnectorPropsChange),
384
385 Refresh {
388 table_id: TableId,
389 associated_source_id: SourceId,
390 },
391 ListFinish {
392 table_id: TableId,
393 associated_source_id: SourceId,
394 },
395 LoadFinish {
396 table_id: TableId,
397 associated_source_id: SourceId,
398 },
399
400 ResetSource {
403 source_id: SourceId,
404 },
405}
406
407impl std::fmt::Display for Command {
409 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
410 match self {
411 Command::Flush => write!(f, "Flush"),
412 Command::Pause => write!(f, "Pause"),
413 Command::Resume => write!(f, "Resume"),
414 Command::DropStreamingJobs {
415 streaming_job_ids, ..
416 } => {
417 write!(
418 f,
419 "DropStreamingJobs: {}",
420 streaming_job_ids.iter().sorted().join(", ")
421 )
422 }
423 Command::CreateStreamingJob { info, .. } => {
424 write!(f, "CreateStreamingJob: {}", info.streaming_job)
425 }
426 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
427 Command::ReplaceStreamJob(plan) => {
428 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
429 }
430 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
431 Command::Throttle { .. } => write!(f, "Throttle"),
432 Command::CreateSubscription {
433 subscription_id, ..
434 } => write!(f, "CreateSubscription: {subscription_id}"),
435 Command::DropSubscription {
436 subscription_id, ..
437 } => write!(f, "DropSubscription: {subscription_id}"),
438 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
439 Command::Refresh {
440 table_id,
441 associated_source_id,
442 } => write!(
443 f,
444 "Refresh: {} (source: {})",
445 table_id, associated_source_id
446 ),
447 Command::ListFinish {
448 table_id,
449 associated_source_id,
450 } => write!(
451 f,
452 "ListFinish: {} (source: {})",
453 table_id, associated_source_id
454 ),
455 Command::LoadFinish {
456 table_id,
457 associated_source_id,
458 } => write!(
459 f,
460 "LoadFinish: {} (source: {})",
461 table_id, associated_source_id
462 ),
463 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
464 }
465 }
466}
467
468impl Command {
469 pub fn pause() -> Self {
470 Self::Pause
471 }
472
473 pub fn resume() -> Self {
474 Self::Resume
475 }
476
477 #[expect(clippy::type_complexity)]
478 pub(super) fn fragment_changes(
479 &self,
480 ) -> Option<(
481 Option<(JobId, Option<CdcTableBackfillTracker>)>,
482 HashMap<FragmentId, CommandFragmentChanges>,
483 )> {
484 match self {
485 Command::Flush => None,
486 Command::Pause => None,
487 Command::Resume => None,
488 Command::DropStreamingJobs {
489 unregistered_fragment_ids,
490 dropped_sink_fragment_by_targets,
491 ..
492 } => {
493 let changes = unregistered_fragment_ids
494 .iter()
495 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
496 .chain(dropped_sink_fragment_by_targets.iter().map(
497 |(target_fragment, sink_fragments)| {
498 (
499 *target_fragment,
500 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
501 )
502 },
503 ))
504 .collect();
505
506 Some((None, changes))
507 }
508 Command::CreateStreamingJob { info, job_type, .. } => {
509 assert!(
510 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
511 "should handle fragment changes separately for snapshot backfill"
512 );
513 let mut changes: HashMap<_, _> = info
514 .stream_job_fragments
515 .new_fragment_info(&info.init_split_assignment)
516 .map(|(fragment_id, fragment_infos)| {
517 (
518 fragment_id,
519 CommandFragmentChanges::NewFragment {
520 job_id: info.streaming_job.id(),
521 info: fragment_infos,
522 },
523 )
524 })
525 .collect();
526
527 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
528 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
529 changes.insert(
530 downstream_fragment_id,
531 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
532 upstream_fragment_id: ctx.sink_fragment_id,
533 sink_output_schema: ctx.sink_output_fields.clone(),
534 project_exprs: ctx.project_exprs.clone(),
535 }),
536 );
537 }
538
539 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
540 let (fragment, _) =
541 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
542 .expect("should have parallel cdc fragment");
543 Some(CdcTableBackfillTracker::new(
544 fragment.fragment_id,
545 splits.clone(),
546 ))
547 } else {
548 None
549 };
550
551 Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
552 }
553 Command::RescheduleFragment { reschedules, .. } => Some((
554 None,
555 reschedules
556 .iter()
557 .map(|(fragment_id, reschedule)| {
558 (
559 *fragment_id,
560 CommandFragmentChanges::Reschedule {
561 new_actors: reschedule
562 .added_actors
563 .iter()
564 .flat_map(|(node_id, actors)| {
565 actors.iter().map(|actor_id| {
566 (
567 *actor_id,
568 InflightActorInfo {
569 worker_id: *node_id,
570 vnode_bitmap: reschedule
571 .newly_created_actors
572 .get(actor_id)
573 .expect("should exist")
574 .0
575 .0
576 .vnode_bitmap
577 .clone(),
578 splits: reschedule
579 .actor_splits
580 .get(actor_id)
581 .cloned()
582 .unwrap_or_default(),
583 },
584 )
585 })
586 })
587 .collect(),
588 actor_update_vnode_bitmap: reschedule
589 .vnode_bitmap_updates
590 .iter()
591 .filter(|(actor_id, _)| {
592 !reschedule.newly_created_actors.contains_key(actor_id)
594 })
595 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
596 .collect(),
597 to_remove: reschedule.removed_actors.iter().cloned().collect(),
598 actor_splits: reschedule.actor_splits.clone(),
599 },
600 )
601 })
602 .collect(),
603 )),
604 Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
605 Command::SourceChangeSplit(SplitState {
606 split_assignment, ..
607 }) => Some((
608 None,
609 split_assignment
610 .iter()
611 .map(|(&fragment_id, splits)| {
612 (
613 fragment_id,
614 CommandFragmentChanges::SplitAssignment {
615 actor_splits: splits.clone(),
616 },
617 )
618 })
619 .collect(),
620 )),
621 Command::Throttle { .. } => None,
622 Command::CreateSubscription { .. } => None,
623 Command::DropSubscription { .. } => None,
624 Command::ConnectorPropsChange(_) => None,
625 Command::Refresh { .. } => None, Command::ListFinish { .. } => None, Command::LoadFinish { .. } => None, Command::ResetSource { .. } => None, }
630 }
631
632 pub fn need_checkpoint(&self) -> bool {
633 !matches!(self, Command::Resume)
635 }
636}
637
638#[derive(Debug, Clone)]
639pub enum BarrierKind {
640 Initial,
641 Barrier,
642 Checkpoint(Vec<u64>),
644}
645
646impl BarrierKind {
647 pub fn to_protobuf(&self) -> PbBarrierKind {
648 match self {
649 BarrierKind::Initial => PbBarrierKind::Initial,
650 BarrierKind::Barrier => PbBarrierKind::Barrier,
651 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
652 }
653 }
654
655 pub fn is_checkpoint(&self) -> bool {
656 matches!(self, BarrierKind::Checkpoint(_))
657 }
658
659 pub fn is_initial(&self) -> bool {
660 matches!(self, BarrierKind::Initial)
661 }
662
663 pub fn as_str_name(&self) -> &'static str {
664 match self {
665 BarrierKind::Initial => "Initial",
666 BarrierKind::Barrier => "Barrier",
667 BarrierKind::Checkpoint(_) => "Checkpoint",
668 }
669 }
670}
671
672pub(super) struct CommandContext {
675 mv_subscription_max_retention: HashMap<TableId, u64>,
676
677 pub(super) barrier_info: BarrierInfo,
678
679 pub(super) table_ids_to_commit: HashSet<TableId>,
680
681 pub(super) command: Option<Command>,
682
683 _span: tracing::Span,
689}
690
691impl std::fmt::Debug for CommandContext {
692 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
693 f.debug_struct("CommandContext")
694 .field("barrier_info", &self.barrier_info)
695 .field("command", &self.command)
696 .finish()
697 }
698}
699
700impl std::fmt::Display for CommandContext {
701 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
702 write!(
703 f,
704 "prev_epoch={}, curr_epoch={}, kind={}",
705 self.barrier_info.prev_epoch(),
706 self.barrier_info.curr_epoch(),
707 self.barrier_info.kind.as_str_name()
708 )?;
709 if let Some(command) = &self.command {
710 write!(f, ", command={}", command)?;
711 }
712 Ok(())
713 }
714}
715
716impl CommandContext {
717 pub(super) fn new(
718 barrier_info: BarrierInfo,
719 mv_subscription_max_retention: HashMap<TableId, u64>,
720 table_ids_to_commit: HashSet<TableId>,
721 command: Option<Command>,
722 span: tracing::Span,
723 ) -> Self {
724 Self {
725 mv_subscription_max_retention,
726 barrier_info,
727 table_ids_to_commit,
728 command,
729 _span: span,
730 }
731 }
732
733 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
734 let Some(truncate_timestamptz) = Timestamptz::from_secs(
735 self.barrier_info
736 .prev_epoch
737 .value()
738 .as_timestamptz()
739 .timestamp()
740 - retention_second as i64,
741 ) else {
742 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
743 return self.barrier_info.prev_epoch.value();
744 };
745 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
746 }
747
748 pub(super) fn collect_commit_epoch_info(
749 &self,
750 info: &mut CommitEpochInfo,
751 resps: Vec<BarrierCompleteResponse>,
752 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
753 ) {
754 let (
755 sst_to_context,
756 synced_ssts,
757 new_table_watermarks,
758 old_value_ssts,
759 vector_index_adds,
760 truncate_tables,
761 ) = collect_resp_info(resps);
762
763 let new_table_fragment_infos =
764 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
765 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
766 {
767 let table_fragments = &info.stream_job_fragments;
768 let mut table_ids: HashSet<_> =
769 table_fragments.internal_table_ids().into_iter().collect();
770 if let Some(mv_table_id) = table_fragments.mv_table_id() {
771 table_ids.insert(mv_table_id);
772 }
773
774 vec![NewTableFragmentInfo { table_ids }]
775 } else {
776 vec![]
777 };
778
779 let mut mv_log_store_truncate_epoch = HashMap::new();
780 let mut update_truncate_epoch =
782 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
783 Entry::Occupied(mut entry) => {
784 let prev_truncate_epoch = entry.get_mut();
785 if truncate_epoch < *prev_truncate_epoch {
786 *prev_truncate_epoch = truncate_epoch;
787 }
788 }
789 Entry::Vacant(entry) => {
790 entry.insert(truncate_epoch);
791 }
792 };
793 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
794 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
795 update_truncate_epoch(*mv_table_id, truncate_epoch);
796 }
797 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
798 for mv_table_id in upstream_mv_table_ids {
799 update_truncate_epoch(mv_table_id, backfill_epoch);
800 }
801 }
802
803 let table_new_change_log = build_table_change_log_delta(
804 old_value_ssts.into_iter(),
805 synced_ssts.iter().map(|sst| &sst.sst_info),
806 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
807 mv_log_store_truncate_epoch.into_iter(),
808 );
809
810 let epoch = self.barrier_info.prev_epoch();
811 for table_id in &self.table_ids_to_commit {
812 info.tables_to_commit
813 .try_insert(*table_id, epoch)
814 .expect("non duplicate");
815 }
816
817 info.sstables.extend(synced_ssts);
818 info.new_table_watermarks.extend(new_table_watermarks);
819 info.sst_to_context.extend(sst_to_context);
820 info.new_table_fragment_infos
821 .extend(new_table_fragment_infos);
822 info.change_log_delta.extend(table_new_change_log);
823 for (table_id, vector_index_adds) in vector_index_adds {
824 info.vector_index_delta
825 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
826 .expect("non-duplicate");
827 }
828 if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
829 for fragment in job_info.stream_job_fragments.fragments.values() {
830 visit_stream_node_cont(&fragment.nodes, |node| {
831 match node.node_body.as_ref().unwrap() {
832 NodeBody::VectorIndexWrite(vector_index_write) => {
833 let index_table = vector_index_write.table.as_ref().unwrap();
834 assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
835 info.vector_index_delta
836 .try_insert(
837 index_table.id,
838 VectorIndexDelta::Init(PbVectorIndexInit {
839 info: Some(index_table.vector_index_info.unwrap()),
840 }),
841 )
842 .expect("non-duplicate");
843 false
844 }
845 _ => true,
846 }
847 })
848 }
849 }
850 info.truncate_tables.extend(truncate_tables);
851 }
852}
853
854impl Command {
855 pub(super) fn to_mutation(
859 &self,
860 is_currently_paused: bool,
861 edges: &mut Option<FragmentEdgeBuildResult>,
862 control_stream_manager: &ControlStreamManager,
863 database_info: &mut InflightDatabaseInfo,
864 ) -> MetaResult<Option<Mutation>> {
865 let mutation = match self {
866 Command::Flush => None,
867
868 Command::Pause => {
869 if !is_currently_paused {
872 Some(Mutation::Pause(PauseMutation {}))
873 } else {
874 None
875 }
876 }
877
878 Command::Resume => {
879 if is_currently_paused {
881 Some(Mutation::Resume(ResumeMutation {}))
882 } else {
883 None
884 }
885 }
886
887 Command::SourceChangeSplit(SplitState {
888 split_assignment, ..
889 }) => {
890 let mut diff = HashMap::new();
891
892 for actor_splits in split_assignment.values() {
893 diff.extend(actor_splits.clone());
894 }
895
896 Some(Mutation::Splits(SourceChangeSplitMutation {
897 actor_splits: build_actor_connector_splits(&diff),
898 }))
899 }
900
901 Command::Throttle { config, .. } => {
902 let mut fragment_to_apply = HashMap::new();
903 for (fragment_id, limit) in config {
904 fragment_to_apply.insert(*fragment_id, RateLimit { rate_limit: *limit });
905 }
906
907 Some(Mutation::Throttle(ThrottleMutation {
908 fragment_throttle: fragment_to_apply,
909 }))
910 }
911
912 Command::DropStreamingJobs {
913 actors,
914 dropped_sink_fragment_by_targets,
915 ..
916 } => Some(Mutation::Stop(StopMutation {
917 actors: actors.clone(),
918 dropped_sink_fragments: dropped_sink_fragment_by_targets
919 .values()
920 .flatten()
921 .cloned()
922 .collect(),
923 })),
924
925 Command::CreateStreamingJob {
926 info:
927 CreateStreamingJobCommandInfo {
928 stream_job_fragments,
929 init_split_assignment: split_assignment,
930 upstream_fragment_downstreams,
931 fragment_backfill_ordering,
932 ..
933 },
934 job_type,
935 ..
936 } => {
937 let edges = edges.as_mut().expect("should exist");
938 let added_actors = stream_job_fragments.actor_ids().collect();
939 let actor_splits = split_assignment
940 .values()
941 .flat_map(build_actor_connector_splits)
942 .collect();
943 let subscriptions_to_add =
944 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
945 job_type
946 {
947 snapshot_backfill_info
948 .upstream_mv_table_id_to_backfill_epoch
949 .keys()
950 .map(|table_id| SubscriptionUpstreamInfo {
951 subscriber_id: stream_job_fragments
952 .stream_job_id()
953 .as_subscriber_id(),
954 upstream_mv_table_id: *table_id,
955 })
956 .collect()
957 } else {
958 Default::default()
959 };
960 let backfill_nodes_to_pause: Vec<_> =
961 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
962 .into_iter()
963 .collect();
964
965 let new_upstream_sinks =
966 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
967 sink_fragment_id,
968 sink_output_fields,
969 project_exprs,
970 new_sink_downstream,
971 ..
972 }) = job_type
973 {
974 let new_sink_actors = stream_job_fragments
975 .actors_to_create()
976 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
977 .exactly_one()
978 .map(|(_, _, actors)| {
979 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
980 actor_id: actor.actor_id,
981 host: Some(control_stream_manager.host_addr(worker_id)),
982 partial_graph_id: to_partial_graph_id(None),
983 })
984 })
985 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
986 let new_upstream_sink = PbNewUpstreamSink {
987 info: Some(PbUpstreamSinkInfo {
988 upstream_fragment_id: *sink_fragment_id,
989 sink_output_schema: sink_output_fields.clone(),
990 project_exprs: project_exprs.clone(),
991 }),
992 upstream_actors: new_sink_actors.collect(),
993 };
994 HashMap::from([(
995 new_sink_downstream.downstream_fragment_id,
996 new_upstream_sink,
997 )])
998 } else {
999 HashMap::new()
1000 };
1001
1002 let actor_cdc_table_snapshot_splits =
1003 if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
1004 database_info
1005 .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
1006 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
1007 } else {
1008 None
1009 };
1010
1011 let add_mutation = AddMutation {
1012 actor_dispatchers: edges
1013 .dispatchers
1014 .extract_if(|fragment_id, _| {
1015 upstream_fragment_downstreams.contains_key(fragment_id)
1016 })
1017 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1018 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1019 .collect(),
1020 added_actors,
1021 actor_splits,
1022 pause: is_currently_paused,
1024 subscriptions_to_add,
1025 backfill_nodes_to_pause,
1026 actor_cdc_table_snapshot_splits,
1027 new_upstream_sinks,
1028 };
1029
1030 Some(Mutation::Add(add_mutation))
1031 }
1032
1033 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1034 old_fragments,
1035 replace_upstream,
1036 upstream_fragment_downstreams,
1037 init_split_assignment,
1038 auto_refresh_schema_sinks,
1039 ..
1040 }) => {
1041 let edges = edges.as_mut().expect("should exist");
1042 let merge_updates = edges
1043 .merge_updates
1044 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1045 .collect();
1046 let dispatchers = edges
1047 .dispatchers
1048 .extract_if(|fragment_id, _| {
1049 upstream_fragment_downstreams.contains_key(fragment_id)
1050 })
1051 .collect();
1052 let actor_cdc_table_snapshot_splits = database_info
1053 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1054 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1055 Self::generate_update_mutation_for_replace_table(
1056 old_fragments.actor_ids().chain(
1057 auto_refresh_schema_sinks
1058 .as_ref()
1059 .into_iter()
1060 .flat_map(|sinks| {
1061 sinks.iter().flat_map(|sink| {
1062 sink.original_fragment
1063 .actors
1064 .iter()
1065 .map(|actor| actor.actor_id)
1066 })
1067 }),
1068 ),
1069 merge_updates,
1070 dispatchers,
1071 init_split_assignment,
1072 actor_cdc_table_snapshot_splits,
1073 auto_refresh_schema_sinks.as_ref(),
1074 )
1075 }
1076
1077 Command::RescheduleFragment {
1078 reschedules,
1079 fragment_actors,
1080 ..
1081 } => {
1082 let mut dispatcher_update = HashMap::new();
1083 for reschedule in reschedules.values() {
1084 for &(upstream_fragment_id, dispatcher_id) in
1085 &reschedule.upstream_fragment_dispatcher_ids
1086 {
1087 let upstream_actor_ids = fragment_actors
1089 .get(&upstream_fragment_id)
1090 .expect("should contain");
1091
1092 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1093
1094 for &actor_id in upstream_actor_ids {
1096 let added_downstream_actor_id = if upstream_reschedule
1097 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1098 .unwrap_or(true)
1099 {
1100 reschedule
1101 .added_actors
1102 .values()
1103 .flatten()
1104 .cloned()
1105 .collect()
1106 } else {
1107 Default::default()
1108 };
1109 dispatcher_update
1111 .try_insert(
1112 (actor_id, dispatcher_id),
1113 DispatcherUpdate {
1114 actor_id,
1115 dispatcher_id,
1116 hash_mapping: reschedule
1117 .upstream_dispatcher_mapping
1118 .as_ref()
1119 .map(|m| m.to_protobuf()),
1120 added_downstream_actor_id,
1121 removed_downstream_actor_id: reschedule
1122 .removed_actors
1123 .iter()
1124 .cloned()
1125 .collect(),
1126 },
1127 )
1128 .unwrap();
1129 }
1130 }
1131 }
1132 let dispatcher_update = dispatcher_update.into_values().collect();
1133
1134 let mut merge_update = HashMap::new();
1135 for (&fragment_id, reschedule) in reschedules {
1136 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1137 let downstream_actor_ids = fragment_actors
1139 .get(&downstream_fragment_id)
1140 .expect("should contain");
1141
1142 let downstream_removed_actors: HashSet<_> = reschedules
1146 .get(&downstream_fragment_id)
1147 .map(|downstream_reschedule| {
1148 downstream_reschedule
1149 .removed_actors
1150 .iter()
1151 .copied()
1152 .collect()
1153 })
1154 .unwrap_or_default();
1155
1156 for &actor_id in downstream_actor_ids {
1158 if downstream_removed_actors.contains(&actor_id) {
1159 continue;
1160 }
1161
1162 merge_update
1164 .try_insert(
1165 (actor_id, fragment_id),
1166 MergeUpdate {
1167 actor_id,
1168 upstream_fragment_id: fragment_id,
1169 new_upstream_fragment_id: None,
1170 added_upstream_actors: reschedule
1171 .added_actors
1172 .iter()
1173 .flat_map(|(worker_id, actors)| {
1174 let host =
1175 control_stream_manager.host_addr(*worker_id);
1176 actors.iter().map(move |&actor_id| PbActorInfo {
1177 actor_id,
1178 host: Some(host.clone()),
1179 partial_graph_id: to_partial_graph_id(None),
1181 })
1182 })
1183 .collect(),
1184 removed_upstream_actor_id: reschedule
1185 .removed_actors
1186 .iter()
1187 .cloned()
1188 .collect(),
1189 },
1190 )
1191 .unwrap();
1192 }
1193 }
1194 }
1195 let merge_update = merge_update.into_values().collect();
1196
1197 let mut actor_vnode_bitmap_update = HashMap::new();
1198 for reschedule in reschedules.values() {
1199 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1201 let bitmap = bitmap.to_protobuf();
1202 actor_vnode_bitmap_update
1203 .try_insert(actor_id, bitmap)
1204 .unwrap();
1205 }
1206 }
1207 let dropped_actors = reschedules
1208 .values()
1209 .flat_map(|r| r.removed_actors.iter().copied())
1210 .collect();
1211 let mut actor_splits = HashMap::new();
1212 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1213 for (fragment_id, reschedule) in reschedules {
1214 for (actor_id, splits) in &reschedule.actor_splits {
1215 actor_splits.insert(
1216 *actor_id,
1217 ConnectorSplits {
1218 splits: splits.iter().map(ConnectorSplit::from).collect(),
1219 },
1220 );
1221 }
1222
1223 if let Some(assignment) =
1224 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1225 {
1226 actor_cdc_table_snapshot_splits.extend(assignment)
1227 }
1228 }
1229
1230 let actor_new_dispatchers = HashMap::new();
1232 let mutation = Mutation::Update(UpdateMutation {
1233 dispatcher_update,
1234 merge_update,
1235 actor_vnode_bitmap_update,
1236 dropped_actors,
1237 actor_splits,
1238 actor_new_dispatchers,
1239 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1240 splits: actor_cdc_table_snapshot_splits,
1241 }),
1242 sink_schema_change: Default::default(),
1243 subscriptions_to_drop: vec![],
1244 });
1245 tracing::debug!("update mutation: {mutation:?}");
1246 Some(mutation)
1247 }
1248
1249 Command::CreateSubscription {
1250 upstream_mv_table_id,
1251 subscription_id,
1252 ..
1253 } => Some(Mutation::Add(AddMutation {
1254 actor_dispatchers: Default::default(),
1255 added_actors: vec![],
1256 actor_splits: Default::default(),
1257 pause: false,
1258 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1259 upstream_mv_table_id: *upstream_mv_table_id,
1260 subscriber_id: subscription_id.as_subscriber_id(),
1261 }],
1262 backfill_nodes_to_pause: vec![],
1263 actor_cdc_table_snapshot_splits: None,
1264 new_upstream_sinks: Default::default(),
1265 })),
1266 Command::DropSubscription {
1267 upstream_mv_table_id,
1268 subscription_id,
1269 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1270 info: vec![SubscriptionUpstreamInfo {
1271 subscriber_id: subscription_id.as_subscriber_id(),
1272 upstream_mv_table_id: *upstream_mv_table_id,
1273 }],
1274 })),
1275 Command::ConnectorPropsChange(config) => {
1276 let mut connector_props_infos = HashMap::default();
1277 for (k, v) in config {
1278 connector_props_infos.insert(
1279 k.as_raw_id(),
1280 ConnectorPropsInfo {
1281 connector_props_info: v.clone(),
1282 },
1283 );
1284 }
1285 Some(Mutation::ConnectorPropsChange(
1286 ConnectorPropsChangeMutation {
1287 connector_props_infos,
1288 },
1289 ))
1290 }
1291 Command::Refresh {
1292 table_id,
1293 associated_source_id,
1294 } => Some(Mutation::RefreshStart(
1295 risingwave_pb::stream_plan::RefreshStartMutation {
1296 table_id: *table_id,
1297 associated_source_id: *associated_source_id,
1298 },
1299 )),
1300 Command::ListFinish {
1301 table_id: _,
1302 associated_source_id,
1303 } => Some(Mutation::ListFinish(ListFinishMutation {
1304 associated_source_id: *associated_source_id,
1305 })),
1306 Command::LoadFinish {
1307 table_id: _,
1308 associated_source_id,
1309 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1310 associated_source_id: *associated_source_id,
1311 })),
1312 Command::ResetSource { source_id } => Some(Mutation::ResetSource(
1313 risingwave_pb::stream_plan::ResetSourceMutation {
1314 source_id: source_id.as_raw_id(),
1315 },
1316 )),
1317 };
1318 Ok(mutation)
1319 }
1320
1321 pub(super) fn actors_to_create(
1322 &self,
1323 database_info: &InflightDatabaseInfo,
1324 edges: &mut Option<FragmentEdgeBuildResult>,
1325 control_stream_manager: &ControlStreamManager,
1326 ) -> Option<StreamJobActorsToCreate> {
1327 match self {
1328 Command::CreateStreamingJob { info, job_type, .. } => {
1329 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1330 return None;
1332 }
1333 let actors_to_create = info.stream_job_fragments.actors_to_create();
1334 let edges = edges.as_mut().expect("should exist");
1335 Some(edges.collect_actors_to_create(actors_to_create.map(
1336 |(fragment_id, node, actors)| {
1337 (
1338 fragment_id,
1339 node,
1340 actors,
1341 [], )
1343 },
1344 )))
1345 }
1346 Command::RescheduleFragment {
1347 reschedules,
1348 fragment_actors,
1349 ..
1350 } => {
1351 let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1353 reschedules.iter().map(|(fragment_id, reschedule)| {
1354 (
1355 *fragment_id,
1356 reschedule.newly_created_actors.values().map(
1357 |((actor, dispatchers), _)| {
1358 (actor.actor_id, dispatchers.as_slice())
1359 },
1360 ),
1361 )
1362 }),
1363 Some((reschedules, fragment_actors)),
1364 database_info,
1365 control_stream_manager,
1366 );
1367 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1368 for (fragment_id, (actor, dispatchers), worker_id) in
1369 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1370 reschedule
1371 .newly_created_actors
1372 .values()
1373 .map(|(actors, status)| (*fragment_id, actors, status))
1374 })
1375 {
1376 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1377 map.entry(*worker_id)
1378 .or_default()
1379 .entry(fragment_id)
1380 .or_insert_with(|| {
1381 let node = database_info.fragment(fragment_id).nodes.clone();
1382 let subscribers =
1383 database_info.fragment_subscribers(fragment_id).collect();
1384 (node, vec![], subscribers)
1385 })
1386 .1
1387 .push((actor.clone(), upstreams, dispatchers.clone()));
1388 }
1389 Some(map)
1390 }
1391 Command::ReplaceStreamJob(replace_table) => {
1392 let edges = edges.as_mut().expect("should exist");
1393 let mut actors = edges.collect_actors_to_create(
1394 replace_table.new_fragments.actors_to_create().map(
1395 |(fragment_id, node, actors)| {
1396 (
1397 fragment_id,
1398 node,
1399 actors,
1400 database_info
1401 .job_subscribers(replace_table.old_fragments.stream_job_id),
1402 )
1403 },
1404 ),
1405 );
1406 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1407 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1408 (
1409 sink.new_fragment.fragment_id,
1410 &sink.new_fragment.nodes,
1411 sink.new_fragment.actors.iter().map(|actor| {
1412 (
1413 actor,
1414 sink.actor_status[&actor.actor_id]
1415 .location
1416 .as_ref()
1417 .unwrap()
1418 .worker_node_id,
1419 )
1420 }),
1421 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1422 )
1423 }));
1424 for (worker_id, fragment_actors) in sink_actors {
1425 actors.entry(worker_id).or_default().extend(fragment_actors);
1426 }
1427 }
1428 Some(actors)
1429 }
1430 _ => None,
1431 }
1432 }
1433
1434 fn generate_update_mutation_for_replace_table(
1435 dropped_actors: impl IntoIterator<Item = ActorId>,
1436 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1437 dispatchers: FragmentActorDispatchers,
1438 init_split_assignment: &SplitAssignment,
1439 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1440 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1441 ) -> Option<Mutation> {
1442 let dropped_actors = dropped_actors.into_iter().collect();
1443
1444 let actor_new_dispatchers = dispatchers
1445 .into_values()
1446 .flatten()
1447 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1448 .collect();
1449
1450 let actor_splits = init_split_assignment
1451 .values()
1452 .flat_map(build_actor_connector_splits)
1453 .collect();
1454 Some(Mutation::Update(UpdateMutation {
1455 actor_new_dispatchers,
1456 merge_update: merge_updates.into_values().flatten().collect(),
1457 dropped_actors,
1458 actor_splits,
1459 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1460 sink_schema_change: auto_refresh_schema_sinks
1461 .as_ref()
1462 .into_iter()
1463 .flat_map(|sinks| {
1464 sinks.iter().map(|sink| {
1465 (
1466 sink.original_sink.id.as_raw_id(),
1467 PbSinkSchemaChange {
1468 original_schema: sink
1469 .original_sink
1470 .columns
1471 .iter()
1472 .map(|col| PbField {
1473 data_type: Some(
1474 col.column_desc
1475 .as_ref()
1476 .unwrap()
1477 .column_type
1478 .as_ref()
1479 .unwrap()
1480 .clone(),
1481 ),
1482 name: col.column_desc.as_ref().unwrap().name.clone(),
1483 })
1484 .collect(),
1485 op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1486 fields: sink
1487 .newly_add_fields
1488 .iter()
1489 .map(|field| field.to_prost())
1490 .collect(),
1491 })),
1492 },
1493 )
1494 })
1495 })
1496 .collect(),
1497 ..Default::default()
1498 }))
1499 }
1500
1501 pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_ {
1503 match self {
1504 Command::DropStreamingJobs {
1505 streaming_job_ids, ..
1506 } => Some(streaming_job_ids.iter().cloned()),
1507 _ => None,
1508 }
1509 .into_iter()
1510 .flatten()
1511 }
1512}
1513
1514impl Command {
1515 #[expect(clippy::type_complexity)]
1516 pub(super) fn collect_database_partial_graph_actor_upstreams(
1517 actor_dispatchers: impl Iterator<
1518 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1519 >,
1520 reschedule_dispatcher_update: Option<(
1521 &HashMap<FragmentId, Reschedule>,
1522 &HashMap<FragmentId, HashSet<ActorId>>,
1523 )>,
1524 database_info: &InflightDatabaseInfo,
1525 control_stream_manager: &ControlStreamManager,
1526 ) -> HashMap<ActorId, ActorUpstreams> {
1527 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1528 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1529 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1530 for (upstream_actor_id, dispatchers) in upstream_actors {
1531 let upstream_actor_location =
1532 upstream_fragment.actors[&upstream_actor_id].worker_id;
1533 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1534 for downstream_actor_id in dispatchers
1535 .iter()
1536 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1537 {
1538 actor_upstreams
1539 .entry(*downstream_actor_id)
1540 .or_default()
1541 .entry(upstream_fragment_id)
1542 .or_default()
1543 .insert(
1544 upstream_actor_id,
1545 PbActorInfo {
1546 actor_id: upstream_actor_id,
1547 host: Some(upstream_actor_host.clone()),
1548 partial_graph_id: to_partial_graph_id(None),
1549 },
1550 );
1551 }
1552 }
1553 }
1554 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1555 for reschedule in reschedules.values() {
1556 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1557 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1558 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1559 for upstream_actor_id in fragment_actors
1560 .get(upstream_fragment_id)
1561 .expect("should exist")
1562 {
1563 let upstream_actor_location =
1564 upstream_fragment.actors[upstream_actor_id].worker_id;
1565 let upstream_actor_host =
1566 control_stream_manager.host_addr(upstream_actor_location);
1567 if let Some(upstream_reschedule) = upstream_reschedule
1568 && upstream_reschedule
1569 .removed_actors
1570 .contains(upstream_actor_id)
1571 {
1572 continue;
1573 }
1574 for (_, downstream_actor_id) in
1575 reschedule
1576 .added_actors
1577 .iter()
1578 .flat_map(|(worker_id, actors)| {
1579 actors.iter().map(|actor| (*worker_id, *actor))
1580 })
1581 {
1582 actor_upstreams
1583 .entry(downstream_actor_id)
1584 .or_default()
1585 .entry(*upstream_fragment_id)
1586 .or_default()
1587 .insert(
1588 *upstream_actor_id,
1589 PbActorInfo {
1590 actor_id: *upstream_actor_id,
1591 host: Some(upstream_actor_host.clone()),
1592 partial_graph_id: to_partial_graph_id(None),
1593 },
1594 );
1595 }
1596 }
1597 }
1598 }
1599 }
1600 actor_upstreams
1601 }
1602}