1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::ActorMapping;
22use risingwave_common::must_match;
23use risingwave_common::types::Timestamptz;
24use risingwave_common::util::epoch::Epoch;
25use risingwave_connector::source::SplitImpl;
26use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::catalog::{CreateType, Table};
29use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
30use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
31use risingwave_pb::stream_plan::barrier_mutation::Mutation;
32use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
33use risingwave_pb::stream_plan::update_mutation::*;
34use risingwave_pb::stream_plan::{
35 AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers,
36 DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation,
37 StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
38};
39use risingwave_pb::stream_service::BarrierCompleteResponse;
40use tracing::warn;
41
42use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
43use crate::barrier::InflightSubscriptionInfo;
44use crate::barrier::edge_builder::FragmentEdgeBuildResult;
45use crate::barrier::info::BarrierInfo;
46use crate::barrier::utils::collect_resp_info;
47use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
48use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
49use crate::manager::{StreamingJob, StreamingJobType};
50use crate::model::{
51 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
52 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
53 StreamJobFragments, StreamJobFragmentsToCreate,
54};
55use crate::stream::{
56 JobReschedulePostUpdates, SplitAssignment, ThrottleConfig, build_actor_connector_splits,
57};
58
59#[derive(Debug, Clone)]
62pub struct Reschedule {
63 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
65
66 pub removed_actors: Vec<ActorId>,
68
69 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
71
72 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
74 pub upstream_dispatcher_mapping: Option<ActorMapping>,
79
80 pub downstream_fragment_ids: Vec<FragmentId>,
82
83 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
87
88 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
89}
90
91#[derive(Debug, Clone)]
98pub struct ReplaceStreamJobPlan {
99 pub old_fragments: StreamJobFragments,
100 pub new_fragments: StreamJobFragmentsToCreate,
101 pub replace_upstream: FragmentReplaceUpstream,
104 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
105 pub init_split_assignment: SplitAssignment,
111 pub streaming_job: StreamingJob,
113 pub tmp_id: u32,
115 pub to_drop_state_table_ids: Vec<TableId>,
117}
118
119impl ReplaceStreamJobPlan {
120 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
121 let mut fragment_changes = HashMap::new();
122 for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
123 let fragment_change =
124 CommandFragmentChanges::NewFragment(self.streaming_job.id().into(), new_fragment);
125 fragment_changes
126 .try_insert(fragment_id, fragment_change)
127 .expect("non-duplicate");
128 }
129 for fragment in self.old_fragments.fragments.values() {
130 fragment_changes
131 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
132 .expect("non-duplicate");
133 }
134 for (fragment_id, replace_map) in &self.replace_upstream {
135 fragment_changes
136 .try_insert(
137 *fragment_id,
138 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
139 )
140 .expect("non-duplicate");
141 }
142 fragment_changes
143 }
144
145 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
147 let mut fragment_replacements = HashMap::new();
148 for (upstream_fragment_id, new_upstream_fragment_id) in
149 self.replace_upstream.values().flatten()
150 {
151 {
152 let r =
153 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
154 if let Some(r) = r {
155 assert_eq!(
156 *new_upstream_fragment_id, r,
157 "one fragment is replaced by multiple fragments"
158 );
159 }
160 }
161 }
162 fragment_replacements
163 }
164}
165
166#[derive(educe::Educe, Clone)]
167#[educe(Debug)]
168pub struct CreateStreamingJobCommandInfo {
169 #[educe(Debug(ignore))]
170 pub stream_job_fragments: StreamJobFragmentsToCreate,
171 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
172 pub init_split_assignment: SplitAssignment,
173 pub definition: String,
174 pub job_type: StreamingJobType,
175 pub create_type: CreateType,
176 pub streaming_job: StreamingJob,
177 pub internal_tables: Vec<Table>,
178}
179
180impl StreamJobFragments {
181 pub(super) fn new_fragment_info(
182 &self,
183 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
184 self.fragments.values().map(|fragment| {
185 (
186 fragment.fragment_id,
187 InflightFragmentInfo {
188 fragment_id: fragment.fragment_id,
189 distribution_type: fragment.distribution_type.into(),
190 nodes: fragment.nodes.clone(),
191 actors: fragment
192 .actors
193 .iter()
194 .map(|actor| {
195 (
196 actor.actor_id,
197 InflightActorInfo {
198 worker_id: self
199 .actor_status
200 .get(&actor.actor_id)
201 .expect("should exist")
202 .worker_id()
203 as WorkerId,
204 vnode_bitmap: actor.vnode_bitmap.clone(),
205 },
206 )
207 })
208 .collect(),
209 state_table_ids: fragment
210 .state_table_ids
211 .iter()
212 .map(|table_id| TableId::new(*table_id))
213 .collect(),
214 },
215 )
216 })
217 }
218}
219
220#[derive(Debug, Clone)]
221pub struct SnapshotBackfillInfo {
222 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
226}
227
228#[derive(Debug, Clone)]
229pub enum CreateStreamingJobType {
230 Normal,
231 SinkIntoTable(ReplaceStreamJobPlan),
232 SnapshotBackfill(SnapshotBackfillInfo),
233}
234
235#[derive(Debug, strum::Display)]
239pub enum Command {
240 Flush,
243
244 Pause,
247
248 Resume,
252
253 DropStreamingJobs {
261 table_fragments_ids: HashSet<TableId>,
262 actors: Vec<ActorId>,
263 unregistered_state_table_ids: HashSet<TableId>,
264 unregistered_fragment_ids: HashSet<FragmentId>,
265 },
266
267 CreateStreamingJob {
277 info: CreateStreamingJobCommandInfo,
278 job_type: CreateStreamingJobType,
279 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
280 },
281 MergeSnapshotBackfillStreamingJobs(
282 HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
283 ),
284
285 RescheduleFragment {
291 reschedules: HashMap<FragmentId, Reschedule>,
292 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
294 post_updates: JobReschedulePostUpdates,
296 },
297
298 ReplaceStreamJob(ReplaceStreamJobPlan),
305
306 SourceChangeSplit(SplitAssignment),
309
310 Throttle(ThrottleConfig),
313
314 CreateSubscription {
317 subscription_id: u32,
318 upstream_mv_table_id: TableId,
319 retention_second: u64,
320 },
321
322 DropSubscription {
326 subscription_id: u32,
327 upstream_mv_table_id: TableId,
328 },
329}
330
331impl Command {
332 pub fn pause() -> Self {
333 Self::Pause
334 }
335
336 pub fn resume() -> Self {
337 Self::Resume
338 }
339
340 pub fn cancel(table_fragments: &StreamJobFragments) -> Self {
341 Self::DropStreamingJobs {
342 table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
343 actors: table_fragments.actor_ids(),
344 unregistered_state_table_ids: table_fragments
345 .all_table_ids()
346 .map(TableId::new)
347 .collect(),
348 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
349 }
350 }
351
352 pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
353 match self {
354 Command::Flush => None,
355 Command::Pause => None,
356 Command::Resume => None,
357 Command::DropStreamingJobs {
358 unregistered_fragment_ids,
359 ..
360 } => Some(
361 unregistered_fragment_ids
362 .iter()
363 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
364 .collect(),
365 ),
366 Command::CreateStreamingJob { info, job_type, .. } => {
367 assert!(
368 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
369 "should handle fragment changes separately for snapshot backfill"
370 );
371 let mut changes: HashMap<_, _> = info
372 .stream_job_fragments
373 .new_fragment_info()
374 .map(|(fragment_id, fragment_info)| {
375 (
376 fragment_id,
377 CommandFragmentChanges::NewFragment(
378 info.streaming_job.id().into(),
379 fragment_info,
380 ),
381 )
382 })
383 .collect();
384
385 if let CreateStreamingJobType::SinkIntoTable(plan) = job_type {
386 let extra_change = plan.fragment_changes();
387 changes.extend(extra_change);
388 }
389
390 Some(changes)
391 }
392 Command::RescheduleFragment { reschedules, .. } => Some(
393 reschedules
394 .iter()
395 .map(|(fragment_id, reschedule)| {
396 (
397 *fragment_id,
398 CommandFragmentChanges::Reschedule {
399 new_actors: reschedule
400 .added_actors
401 .iter()
402 .flat_map(|(node_id, actors)| {
403 actors.iter().map(|actor_id| {
404 (
405 *actor_id,
406 InflightActorInfo {
407 worker_id: *node_id,
408 vnode_bitmap: reschedule
409 .newly_created_actors
410 .get(actor_id)
411 .expect("should exist")
412 .0
413 .0
414 .vnode_bitmap
415 .clone(),
416 },
417 )
418 })
419 })
420 .collect(),
421 actor_update_vnode_bitmap: reschedule
422 .vnode_bitmap_updates
423 .iter()
424 .filter(|(actor_id, _)| {
425 !reschedule.newly_created_actors.contains_key(actor_id)
427 })
428 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
429 .collect(),
430 to_remove: reschedule.removed_actors.iter().cloned().collect(),
431 },
432 )
433 })
434 .collect(),
435 ),
436 Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
437 Command::MergeSnapshotBackfillStreamingJobs(_) => None,
438 Command::SourceChangeSplit(_) => None,
439 Command::Throttle(_) => None,
440 Command::CreateSubscription { .. } => None,
441 Command::DropSubscription { .. } => None,
442 }
443 }
444
445 pub fn need_checkpoint(&self) -> bool {
446 !matches!(self, Command::Resume)
448 }
449}
450
451#[derive(Debug, Clone)]
452pub enum BarrierKind {
453 Initial,
454 Barrier,
455 Checkpoint(Vec<u64>),
457}
458
459impl BarrierKind {
460 pub fn to_protobuf(&self) -> PbBarrierKind {
461 match self {
462 BarrierKind::Initial => PbBarrierKind::Initial,
463 BarrierKind::Barrier => PbBarrierKind::Barrier,
464 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
465 }
466 }
467
468 pub fn is_checkpoint(&self) -> bool {
469 matches!(self, BarrierKind::Checkpoint(_))
470 }
471
472 pub fn as_str_name(&self) -> &'static str {
473 match self {
474 BarrierKind::Initial => "Initial",
475 BarrierKind::Barrier => "Barrier",
476 BarrierKind::Checkpoint(_) => "Checkpoint",
477 }
478 }
479}
480
481pub(super) struct CommandContext {
484 subscription_info: InflightSubscriptionInfo,
485
486 pub(super) barrier_info: BarrierInfo,
487
488 pub(super) table_ids_to_commit: HashSet<TableId>,
489
490 pub(super) command: Option<Command>,
491
492 _span: tracing::Span,
498}
499
500impl std::fmt::Debug for CommandContext {
501 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
502 f.debug_struct("CommandContext")
503 .field("barrier_info", &self.barrier_info)
504 .field("command", &self.command)
505 .finish()
506 }
507}
508
509impl CommandContext {
510 pub(super) fn new(
511 barrier_info: BarrierInfo,
512 subscription_info: InflightSubscriptionInfo,
513 table_ids_to_commit: HashSet<TableId>,
514 command: Option<Command>,
515 span: tracing::Span,
516 ) -> Self {
517 Self {
518 subscription_info,
519 barrier_info,
520 table_ids_to_commit,
521 command,
522 _span: span,
523 }
524 }
525
526 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
527 let Some(truncate_timestamptz) = Timestamptz::from_secs(
528 self.barrier_info
529 .prev_epoch
530 .value()
531 .as_timestamptz()
532 .timestamp()
533 - retention_second as i64,
534 ) else {
535 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
536 return self.barrier_info.prev_epoch.value();
537 };
538 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
539 }
540
541 pub(super) fn collect_commit_epoch_info(
542 &self,
543 info: &mut CommitEpochInfo,
544 resps: Vec<BarrierCompleteResponse>,
545 backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
546 ) {
547 let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) =
548 collect_resp_info(resps);
549
550 let new_table_fragment_infos =
551 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
552 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
553 {
554 let table_fragments = &info.stream_job_fragments;
555 let mut table_ids: HashSet<_> = table_fragments
556 .internal_table_ids()
557 .into_iter()
558 .map(TableId::new)
559 .collect();
560 if let Some(mv_table_id) = table_fragments.mv_table_id() {
561 table_ids.insert(TableId::new(mv_table_id));
562 }
563
564 vec![NewTableFragmentInfo { table_ids }]
565 } else {
566 vec![]
567 };
568
569 let mut mv_log_store_truncate_epoch = HashMap::new();
570 let mut update_truncate_epoch =
572 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
573 .entry(table_id.table_id)
574 {
575 Entry::Occupied(mut entry) => {
576 let prev_truncate_epoch = entry.get_mut();
577 if truncate_epoch < *prev_truncate_epoch {
578 *prev_truncate_epoch = truncate_epoch;
579 }
580 }
581 Entry::Vacant(entry) => {
582 entry.insert(truncate_epoch);
583 }
584 };
585 for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
586 if let Some(truncate_epoch) = subscriptions
587 .values()
588 .max()
589 .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
590 {
591 update_truncate_epoch(*mv_table_id, truncate_epoch);
592 }
593 }
594 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
595 for mv_table_id in upstream_mv_table_ids {
596 update_truncate_epoch(mv_table_id, backfill_epoch);
597 }
598 }
599
600 let table_new_change_log = build_table_change_log_delta(
601 old_value_ssts.into_iter(),
602 synced_ssts.iter().map(|sst| &sst.sst_info),
603 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
604 mv_log_store_truncate_epoch.into_iter(),
605 );
606
607 let epoch = self.barrier_info.prev_epoch();
608 for table_id in &self.table_ids_to_commit {
609 info.tables_to_commit
610 .try_insert(*table_id, epoch)
611 .expect("non duplicate");
612 }
613
614 info.sstables.extend(synced_ssts);
615 info.new_table_watermarks.extend(new_table_watermarks);
616 info.sst_to_context.extend(sst_to_context);
617 info.new_table_fragment_infos
618 .extend(new_table_fragment_infos);
619 info.change_log_delta.extend(table_new_change_log);
620 }
621}
622
623impl Command {
624 pub(super) fn to_mutation(
628 &self,
629 is_currently_paused: bool,
630 edges: &mut Option<FragmentEdgeBuildResult>,
631 ) -> Option<Mutation> {
632 match self {
633 Command::Flush => None,
634
635 Command::Pause => {
636 if !is_currently_paused {
639 Some(Mutation::Pause(PauseMutation {}))
640 } else {
641 None
642 }
643 }
644
645 Command::Resume => {
646 if is_currently_paused {
648 Some(Mutation::Resume(ResumeMutation {}))
649 } else {
650 None
651 }
652 }
653
654 Command::SourceChangeSplit(change) => {
655 let mut diff = HashMap::new();
656
657 for actor_splits in change.values() {
658 diff.extend(actor_splits.clone());
659 }
660
661 Some(Mutation::Splits(SourceChangeSplitMutation {
662 actor_splits: build_actor_connector_splits(&diff),
663 }))
664 }
665
666 Command::Throttle(config) => {
667 let mut actor_to_apply = HashMap::new();
668 for per_fragment in config.values() {
669 actor_to_apply.extend(
670 per_fragment
671 .iter()
672 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
673 );
674 }
675
676 Some(Mutation::Throttle(ThrottleMutation {
677 actor_throttle: actor_to_apply,
678 }))
679 }
680
681 Command::DropStreamingJobs { actors, .. } => Some(Mutation::Stop(StopMutation {
682 actors: actors.clone(),
683 })),
684
685 Command::CreateStreamingJob {
686 info:
687 CreateStreamingJobCommandInfo {
688 stream_job_fragments: table_fragments,
689 init_split_assignment: split_assignment,
690 upstream_fragment_downstreams,
691 ..
692 },
693 job_type,
694 ..
695 } => {
696 let edges = edges.as_mut().expect("should exist");
697 let added_actors = table_fragments.actor_ids();
698 let actor_splits = split_assignment
699 .values()
700 .flat_map(build_actor_connector_splits)
701 .collect();
702 let subscriptions_to_add =
703 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
704 job_type
705 {
706 snapshot_backfill_info
707 .upstream_mv_table_id_to_backfill_epoch
708 .keys()
709 .map(|table_id| SubscriptionUpstreamInfo {
710 subscriber_id: table_fragments.stream_job_id().table_id,
711 upstream_mv_table_id: table_id.table_id,
712 })
713 .collect()
714 } else {
715 Default::default()
716 };
717 let add = Some(Mutation::Add(AddMutation {
718 actor_dispatchers: edges
719 .dispatchers
720 .extract_if(|fragment_id, _| {
721 upstream_fragment_downstreams.contains_key(fragment_id)
722 })
723 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
724 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
725 .collect(),
726 added_actors,
727 actor_splits,
728 pause: is_currently_paused,
730 subscriptions_to_add,
731 }));
732
733 if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
734 old_fragments,
735 init_split_assignment,
736 replace_upstream,
737 upstream_fragment_downstreams,
738 ..
739 }) = job_type
740 {
741 let merge_updates = edges
742 .merge_updates
743 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
744 .collect();
745 let dispatchers = edges
746 .dispatchers
747 .extract_if(|fragment_id, _| {
748 upstream_fragment_downstreams.contains_key(fragment_id)
749 })
750 .collect();
751 let update = Self::generate_update_mutation_for_replace_table(
752 old_fragments,
753 merge_updates,
754 dispatchers,
755 init_split_assignment,
756 );
757
758 Some(Mutation::Combined(CombinedMutation {
759 mutations: vec![
760 BarrierMutation { mutation: add },
761 BarrierMutation { mutation: update },
762 ],
763 }))
764 } else {
765 add
766 }
767 }
768 Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
769 Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
770 info: jobs_to_merge
771 .iter()
772 .flat_map(|(table_id, (backfill_upstream_tables, _))| {
773 backfill_upstream_tables
774 .iter()
775 .map(move |upstream_table_id| SubscriptionUpstreamInfo {
776 subscriber_id: table_id.table_id,
777 upstream_mv_table_id: upstream_table_id.table_id,
778 })
779 })
780 .collect(),
781 }))
782 }
783
784 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
785 old_fragments,
786 replace_upstream,
787 upstream_fragment_downstreams,
788 init_split_assignment,
789 ..
790 }) => {
791 let edges = edges.as_mut().expect("should exist");
792 let merge_updates = edges
793 .merge_updates
794 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
795 .collect();
796 let dispatchers = edges
797 .dispatchers
798 .extract_if(|fragment_id, _| {
799 upstream_fragment_downstreams.contains_key(fragment_id)
800 })
801 .collect();
802 Self::generate_update_mutation_for_replace_table(
803 old_fragments,
804 merge_updates,
805 dispatchers,
806 init_split_assignment,
807 )
808 }
809
810 Command::RescheduleFragment {
811 reschedules,
812 fragment_actors,
813 ..
814 } => {
815 let mut dispatcher_update = HashMap::new();
816 for reschedule in reschedules.values() {
817 for &(upstream_fragment_id, dispatcher_id) in
818 &reschedule.upstream_fragment_dispatcher_ids
819 {
820 let upstream_actor_ids = fragment_actors
822 .get(&upstream_fragment_id)
823 .expect("should contain");
824
825 for &actor_id in upstream_actor_ids {
827 dispatcher_update
829 .try_insert(
830 (actor_id, dispatcher_id),
831 DispatcherUpdate {
832 actor_id,
833 dispatcher_id,
834 hash_mapping: reschedule
835 .upstream_dispatcher_mapping
836 .as_ref()
837 .map(|m| m.to_protobuf()),
838 added_downstream_actor_id: reschedule
839 .added_actors
840 .values()
841 .flatten()
842 .cloned()
843 .collect(),
844 removed_downstream_actor_id: reschedule
845 .removed_actors
846 .clone(),
847 },
848 )
849 .unwrap();
850 }
851 }
852 }
853 let dispatcher_update = dispatcher_update.into_values().collect();
854
855 let mut merge_update = HashMap::new();
856 for (&fragment_id, reschedule) in reschedules {
857 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
858 let downstream_actor_ids = fragment_actors
860 .get(&downstream_fragment_id)
861 .expect("should contain");
862
863 let downstream_removed_actors: HashSet<_> = reschedules
867 .get(&downstream_fragment_id)
868 .map(|downstream_reschedule| {
869 downstream_reschedule
870 .removed_actors
871 .iter()
872 .copied()
873 .collect()
874 })
875 .unwrap_or_default();
876
877 for &actor_id in downstream_actor_ids {
879 if downstream_removed_actors.contains(&actor_id) {
880 continue;
881 }
882
883 merge_update
885 .try_insert(
886 (actor_id, fragment_id),
887 MergeUpdate {
888 actor_id,
889 upstream_fragment_id: fragment_id,
890 new_upstream_fragment_id: None,
891 added_upstream_actor_id: reschedule
892 .added_actors
893 .values()
894 .flatten()
895 .cloned()
896 .collect(),
897 removed_upstream_actor_id: reschedule
898 .removed_actors
899 .clone(),
900 },
901 )
902 .unwrap();
903 }
904 }
905 }
906 let merge_update = merge_update.into_values().collect();
907
908 let mut actor_vnode_bitmap_update = HashMap::new();
909 for reschedule in reschedules.values() {
910 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
912 let bitmap = bitmap.to_protobuf();
913 actor_vnode_bitmap_update
914 .try_insert(actor_id, bitmap)
915 .unwrap();
916 }
917 }
918
919 let dropped_actors = reschedules
920 .values()
921 .flat_map(|r| r.removed_actors.iter().copied())
922 .collect();
923
924 let mut actor_splits = HashMap::new();
925
926 for reschedule in reschedules.values() {
927 for (actor_id, splits) in &reschedule.actor_splits {
928 actor_splits.insert(
929 *actor_id as ActorId,
930 ConnectorSplits {
931 splits: splits.iter().map(ConnectorSplit::from).collect(),
932 },
933 );
934 }
935 }
936
937 let actor_new_dispatchers = HashMap::new();
939
940 let mutation = Mutation::Update(UpdateMutation {
941 dispatcher_update,
942 merge_update,
943 actor_vnode_bitmap_update,
944 dropped_actors,
945 actor_splits,
946 actor_new_dispatchers,
947 });
948 tracing::debug!("update mutation: {mutation:?}");
949 Some(mutation)
950 }
951
952 Command::CreateSubscription {
953 upstream_mv_table_id,
954 subscription_id,
955 ..
956 } => Some(Mutation::Add(AddMutation {
957 actor_dispatchers: Default::default(),
958 added_actors: vec![],
959 actor_splits: Default::default(),
960 pause: false,
961 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
962 upstream_mv_table_id: upstream_mv_table_id.table_id,
963 subscriber_id: *subscription_id,
964 }],
965 })),
966 Command::DropSubscription {
967 upstream_mv_table_id,
968 subscription_id,
969 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
970 info: vec![SubscriptionUpstreamInfo {
971 subscriber_id: *subscription_id,
972 upstream_mv_table_id: upstream_mv_table_id.table_id,
973 }],
974 })),
975 }
976 }
977
978 pub(super) fn actors_to_create(
979 &self,
980 graph_info: &InflightDatabaseInfo,
981 edges: &mut Option<FragmentEdgeBuildResult>,
982 ) -> Option<StreamJobActorsToCreate> {
983 match self {
984 Command::CreateStreamingJob { info, job_type, .. } => {
985 let sink_into_table_replace_plan = match job_type {
986 CreateStreamingJobType::Normal => None,
987 CreateStreamingJobType::SinkIntoTable(replace_table) => Some(replace_table),
988 CreateStreamingJobType::SnapshotBackfill(_) => {
989 return None;
991 }
992 };
993 let get_actors_to_create = || {
994 sink_into_table_replace_plan
995 .map(|plan| plan.new_fragments.actors_to_create())
996 .into_iter()
997 .flatten()
998 .chain(info.stream_job_fragments.actors_to_create())
999 };
1000 let edges = edges.as_mut().expect("should exist");
1001 Some(edges.collect_actors_to_create(get_actors_to_create()))
1002 }
1003 Command::RescheduleFragment {
1004 reschedules,
1005 fragment_actors,
1006 ..
1007 } => {
1008 let mut actor_upstreams = Self::collect_actor_upstreams(
1009 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1010 reschedule
1011 .newly_created_actors
1012 .values()
1013 .map(|((actor, dispatchers), _)| {
1014 (actor.actor_id, *fragment_id, dispatchers.as_slice())
1015 })
1016 }),
1017 Some((reschedules, fragment_actors)),
1018 );
1019 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1020 for (fragment_id, (actor, dispatchers), worker_id) in
1021 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1022 reschedule
1023 .newly_created_actors
1024 .values()
1025 .map(|(actors, status)| (*fragment_id, actors, status))
1026 })
1027 {
1028 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1029 map.entry(*worker_id)
1030 .or_default()
1031 .entry(fragment_id)
1032 .or_insert_with(|| {
1033 let node = graph_info.fragment(fragment_id).nodes.clone();
1034 (node, vec![])
1035 })
1036 .1
1037 .push((actor.clone(), upstreams, dispatchers.clone()));
1038 }
1039 Some(map)
1040 }
1041 Command::ReplaceStreamJob(replace_table) => {
1042 let edges = edges.as_mut().expect("should exist");
1043 Some(edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create()))
1044 }
1045 _ => None,
1046 }
1047 }
1048
1049 fn generate_update_mutation_for_replace_table(
1050 old_fragments: &StreamJobFragments,
1051 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1052 dispatchers: FragmentActorDispatchers,
1053 init_split_assignment: &SplitAssignment,
1054 ) -> Option<Mutation> {
1055 let dropped_actors = old_fragments.actor_ids();
1056
1057 let actor_new_dispatchers = dispatchers
1058 .into_values()
1059 .flatten()
1060 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1061 .collect();
1062
1063 let actor_splits = init_split_assignment
1064 .values()
1065 .flat_map(build_actor_connector_splits)
1066 .collect();
1067
1068 Some(Mutation::Update(UpdateMutation {
1069 actor_new_dispatchers,
1070 merge_update: merge_updates.into_values().flatten().collect(),
1071 dropped_actors,
1072 actor_splits,
1073 ..Default::default()
1074 }))
1075 }
1076
1077 pub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1079 match self {
1080 Command::DropStreamingJobs {
1081 table_fragments_ids,
1082 ..
1083 } => Some(table_fragments_ids.iter().cloned()),
1084 _ => None,
1085 }
1086 .into_iter()
1087 .flatten()
1088 }
1089}
1090
1091impl Command {
1092 #[expect(clippy::type_complexity)]
1093 pub fn collect_actor_upstreams(
1094 actor_dispatchers: impl Iterator<Item = (ActorId, FragmentId, &[Dispatcher])>,
1095 reschedule_dispatcher_update: Option<(
1096 &HashMap<FragmentId, Reschedule>,
1097 &HashMap<FragmentId, HashSet<ActorId>>,
1098 )>,
1099 ) -> HashMap<ActorId, ActorUpstreams> {
1100 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1101 for (upstream_actor_id, upstream_fragment_id, dispatchers) in actor_dispatchers {
1102 for downstream_actor_id in dispatchers
1103 .iter()
1104 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1105 {
1106 actor_upstreams
1107 .entry(*downstream_actor_id)
1108 .or_default()
1109 .entry(upstream_fragment_id)
1110 .or_default()
1111 .insert(upstream_actor_id);
1112 }
1113 }
1114 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1115 for reschedule in reschedules.values() {
1116 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1117 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1118 for upstream_actor_id in fragment_actors
1119 .get(upstream_fragment_id)
1120 .expect("should exist")
1121 {
1122 if let Some(upstream_reschedule) = upstream_reschedule
1123 && upstream_reschedule
1124 .removed_actors
1125 .contains(upstream_actor_id)
1126 {
1127 continue;
1128 }
1129 for downstream_actor_id in reschedule.added_actors.values().flatten() {
1130 actor_upstreams
1131 .entry(*downstream_actor_id)
1132 .or_default()
1133 .entry(*upstream_fragment_id)
1134 .or_default()
1135 .insert(*upstream_actor_id);
1136 }
1137 }
1138 }
1139 }
1140 }
1141 actor_upstreams
1142 }
1143}