1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37 PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
49 PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
50 StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
51};
52use risingwave_pb::stream_service::BarrierCompleteResponse;
53use tracing::warn;
54
55use super::info::InflightDatabaseInfo;
56use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
57use crate::barrier::edge_builder::FragmentEdgeBuildResult;
58use crate::barrier::info::BarrierInfo;
59use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
60use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
61use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
62use crate::controller::scale::LoadedFragmentContext;
63use crate::controller::utils::StreamingJobExtraInfo;
64use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
65use crate::manager::{StreamingJob, StreamingJobType};
66use crate::model::{
67 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
68 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
69 StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
70};
71use crate::stream::{
72 AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
73 SplitAssignment, SplitState, UpstreamSinkInfo, build_actor_connector_splits,
74};
75use crate::{MetaError, MetaResult};
76
77#[derive(Debug, Clone)]
80pub struct Reschedule {
81 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
83
84 pub removed_actors: HashSet<ActorId>,
86
87 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
89
90 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
92 pub upstream_dispatcher_mapping: Option<ActorMapping>,
97
98 pub downstream_fragment_ids: Vec<FragmentId>,
100
101 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
105
106 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
107}
108
109#[derive(Debug, Clone)]
110pub struct ReschedulePlan {
111 pub reschedules: HashMap<FragmentId, Reschedule>,
112 pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
115}
116
117#[derive(Debug, Clone)]
119pub struct RescheduleContext {
120 pub loaded: LoadedFragmentContext,
121 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
122 pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
123 pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
124 pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
125}
126
127impl RescheduleContext {
128 pub fn empty() -> Self {
129 Self {
130 loaded: LoadedFragmentContext::default(),
131 job_extra_info: HashMap::new(),
132 upstream_fragments: HashMap::new(),
133 downstream_fragments: HashMap::new(),
134 downstream_relations: HashMap::new(),
135 }
136 }
137
138 pub fn is_empty(&self) -> bool {
139 self.loaded.is_empty()
140 }
141
142 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
143 let loaded = self.loaded.for_database(database_id)?;
144 let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
145 let fragment_ids: HashSet<FragmentId> = loaded
148 .job_fragments
149 .values()
150 .flat_map(|fragments| fragments.keys().copied())
151 .collect();
152
153 let job_extra_info = self
154 .job_extra_info
155 .iter()
156 .filter(|(job_id, _)| job_ids.contains(*job_id))
157 .map(|(job_id, info)| (*job_id, info.clone()))
158 .collect();
159
160 let upstream_fragments = self
161 .upstream_fragments
162 .iter()
163 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
164 .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
165 .collect();
166
167 let downstream_fragments = self
168 .downstream_fragments
169 .iter()
170 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
171 .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
172 .collect();
173
174 let downstream_relations = self
175 .downstream_relations
176 .iter()
177 .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
181 .map(|(key, relation)| (*key, relation.clone()))
182 .collect();
183
184 Some(Self {
185 loaded,
186 job_extra_info,
187 upstream_fragments,
188 downstream_fragments,
189 downstream_relations,
190 })
191 }
192
193 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
196 let Self {
197 loaded,
198 job_extra_info,
199 upstream_fragments,
200 downstream_fragments,
201 downstream_relations,
202 } = self;
203
204 let mut contexts: HashMap<_, _> = loaded
205 .into_database_contexts()
206 .into_iter()
207 .map(|(database_id, loaded)| {
208 (
209 database_id,
210 Self {
211 loaded,
212 job_extra_info: HashMap::new(),
213 upstream_fragments: HashMap::new(),
214 downstream_fragments: HashMap::new(),
215 downstream_relations: HashMap::new(),
216 },
217 )
218 })
219 .collect();
220
221 if contexts.is_empty() {
222 return contexts;
223 }
224
225 let mut job_databases = HashMap::new();
226 let mut fragment_databases = HashMap::new();
227 for (&database_id, context) in &contexts {
228 for job_id in context.loaded.job_map.keys().copied() {
229 job_databases.insert(job_id, database_id);
230 }
231 for fragment_id in context
232 .loaded
233 .job_fragments
234 .values()
235 .flat_map(|fragments| fragments.keys().copied())
236 {
237 fragment_databases.insert(fragment_id, database_id);
238 }
239 }
240
241 for (job_id, info) in job_extra_info {
242 if let Some(database_id) = job_databases.get(&job_id).copied() {
243 contexts
244 .get_mut(&database_id)
245 .expect("database context should exist for job")
246 .job_extra_info
247 .insert(job_id, info);
248 }
249 }
250
251 for (fragment_id, upstreams) in upstream_fragments {
252 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
253 contexts
254 .get_mut(&database_id)
255 .expect("database context should exist for fragment")
256 .upstream_fragments
257 .insert(fragment_id, upstreams);
258 }
259 }
260
261 for (fragment_id, downstreams) in downstream_fragments {
262 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
263 contexts
264 .get_mut(&database_id)
265 .expect("database context should exist for fragment")
266 .downstream_fragments
267 .insert(fragment_id, downstreams);
268 }
269 }
270
271 for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
272 if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
275 contexts
276 .get_mut(&database_id)
277 .expect("database context should exist for relation source")
278 .downstream_relations
279 .insert((source_fragment_id, target_fragment_id), relation);
280 }
281 }
282
283 contexts
284 }
285}
286
287#[derive(Debug, Clone)]
294pub struct ReplaceStreamJobPlan {
295 pub old_fragments: StreamJobFragments,
296 pub new_fragments: StreamJobFragmentsToCreate,
297 pub replace_upstream: FragmentReplaceUpstream,
300 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
301 pub init_split_assignment: SplitAssignment,
307 pub streaming_job: StreamingJob,
309 pub tmp_id: JobId,
311 pub to_drop_state_table_ids: Vec<TableId>,
313 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
314}
315
316impl ReplaceStreamJobPlan {
317 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
319 let mut fragment_replacements = HashMap::new();
320 for (upstream_fragment_id, new_upstream_fragment_id) in
321 self.replace_upstream.values().flatten()
322 {
323 {
324 let r =
325 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
326 if let Some(r) = r {
327 assert_eq!(
328 *new_upstream_fragment_id, r,
329 "one fragment is replaced by multiple fragments"
330 );
331 }
332 }
333 }
334 fragment_replacements
335 }
336}
337
338#[derive(educe::Educe, Clone)]
339#[educe(Debug)]
340pub struct CreateStreamingJobCommandInfo {
341 #[educe(Debug(ignore))]
342 pub stream_job_fragments: StreamJobFragmentsToCreate,
343 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
344 pub init_split_assignment: SplitAssignment,
345 pub definition: String,
346 pub job_type: StreamingJobType,
347 pub create_type: CreateType,
348 pub streaming_job: StreamingJob,
349 pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
350 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
351 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
352 pub is_serverless: bool,
353}
354
355impl StreamJobFragments {
356 pub(super) fn new_fragment_info<'a>(
357 &'a self,
358 assignment: &'a SplitAssignment,
359 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
360 self.fragments.values().map(|fragment| {
361 let mut fragment_splits = assignment
362 .get(&fragment.fragment_id)
363 .cloned()
364 .unwrap_or_default();
365
366 (
367 fragment.fragment_id,
368 InflightFragmentInfo {
369 fragment_id: fragment.fragment_id,
370 distribution_type: fragment.distribution_type.into(),
371 fragment_type_mask: fragment.fragment_type_mask,
372 vnode_count: fragment.vnode_count(),
373 nodes: fragment.nodes.clone(),
374 actors: fragment
375 .actors
376 .iter()
377 .map(|actor| {
378 (
379 actor.actor_id,
380 InflightActorInfo {
381 worker_id: self
382 .actor_status
383 .get(&actor.actor_id)
384 .expect("should exist")
385 .worker_id(),
386 vnode_bitmap: actor.vnode_bitmap.clone(),
387 splits: fragment_splits
388 .remove(&actor.actor_id)
389 .unwrap_or_default(),
390 },
391 )
392 })
393 .collect(),
394 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
395 },
396 )
397 })
398 }
399}
400
401#[derive(Debug, Clone)]
402pub struct SnapshotBackfillInfo {
403 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
407}
408
409#[derive(Debug, Clone)]
410pub enum CreateStreamingJobType {
411 Normal,
412 SinkIntoTable(UpstreamSinkInfo),
413 SnapshotBackfill(SnapshotBackfillInfo),
414}
415
416#[derive(Debug)]
421pub enum Command {
422 Flush,
425
426 Pause,
429
430 Resume,
434
435 DropStreamingJobs {
443 streaming_job_ids: HashSet<JobId>,
444 actors: Vec<ActorId>,
445 unregistered_state_table_ids: HashSet<TableId>,
446 unregistered_fragment_ids: HashSet<FragmentId>,
447 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
449 },
450
451 CreateStreamingJob {
461 info: CreateStreamingJobCommandInfo,
462 job_type: CreateStreamingJobType,
463 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
464 },
465
466 RescheduleIntent {
468 context: RescheduleContext,
469 reschedule_plan: Option<ReschedulePlan>,
474 },
475
476 ReplaceStreamJob(ReplaceStreamJobPlan),
483
484 SourceChangeSplit(SplitState),
487
488 Throttle {
491 jobs: HashSet<JobId>,
492 config: HashMap<FragmentId, ThrottleConfig>,
493 },
494
495 CreateSubscription {
498 subscription_id: SubscriptionId,
499 upstream_mv_table_id: TableId,
500 retention_second: u64,
501 },
502
503 DropSubscription {
507 subscription_id: SubscriptionId,
508 upstream_mv_table_id: TableId,
509 },
510
511 AlterSubscriptionRetention {
513 subscription_id: SubscriptionId,
514 upstream_mv_table_id: TableId,
515 retention_second: u64,
516 },
517
518 ConnectorPropsChange(ConnectorPropsChange),
519
520 Refresh {
523 table_id: TableId,
524 associated_source_id: SourceId,
525 },
526 ListFinish {
527 table_id: TableId,
528 associated_source_id: SourceId,
529 },
530 LoadFinish {
531 table_id: TableId,
532 associated_source_id: SourceId,
533 },
534
535 ResetSource {
538 source_id: SourceId,
539 },
540
541 ResumeBackfill {
544 target: ResumeBackfillTarget,
545 },
546
547 InjectSourceOffsets {
551 source_id: SourceId,
552 split_offsets: HashMap<String, String>,
554 },
555}
556
557#[derive(Debug, Clone, Copy)]
558pub enum ResumeBackfillTarget {
559 Job(JobId),
560 Fragment(FragmentId),
561}
562
563impl std::fmt::Display for Command {
565 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
566 match self {
567 Command::Flush => write!(f, "Flush"),
568 Command::Pause => write!(f, "Pause"),
569 Command::Resume => write!(f, "Resume"),
570 Command::DropStreamingJobs {
571 streaming_job_ids, ..
572 } => {
573 write!(
574 f,
575 "DropStreamingJobs: {}",
576 streaming_job_ids.iter().sorted().join(", ")
577 )
578 }
579 Command::CreateStreamingJob { info, .. } => {
580 write!(f, "CreateStreamingJob: {}", info.streaming_job)
581 }
582 Command::RescheduleIntent {
583 reschedule_plan, ..
584 } => {
585 if reschedule_plan.is_some() {
586 write!(f, "RescheduleIntent(planned)")
587 } else {
588 write!(f, "RescheduleIntent")
589 }
590 }
591 Command::ReplaceStreamJob(plan) => {
592 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
593 }
594 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
595 Command::Throttle { .. } => write!(f, "Throttle"),
596 Command::CreateSubscription {
597 subscription_id, ..
598 } => write!(f, "CreateSubscription: {subscription_id}"),
599 Command::DropSubscription {
600 subscription_id, ..
601 } => write!(f, "DropSubscription: {subscription_id}"),
602 Command::AlterSubscriptionRetention {
603 subscription_id,
604 retention_second,
605 ..
606 } => write!(
607 f,
608 "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
609 ),
610 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
611 Command::Refresh {
612 table_id,
613 associated_source_id,
614 } => write!(
615 f,
616 "Refresh: {} (source: {})",
617 table_id, associated_source_id
618 ),
619 Command::ListFinish {
620 table_id,
621 associated_source_id,
622 } => write!(
623 f,
624 "ListFinish: {} (source: {})",
625 table_id, associated_source_id
626 ),
627 Command::LoadFinish {
628 table_id,
629 associated_source_id,
630 } => write!(
631 f,
632 "LoadFinish: {} (source: {})",
633 table_id, associated_source_id
634 ),
635 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
636 Command::ResumeBackfill { target } => match target {
637 ResumeBackfillTarget::Job(job_id) => {
638 write!(f, "ResumeBackfill: job={job_id}")
639 }
640 ResumeBackfillTarget::Fragment(fragment_id) => {
641 write!(f, "ResumeBackfill: fragment={fragment_id}")
642 }
643 },
644 Command::InjectSourceOffsets {
645 source_id,
646 split_offsets,
647 } => write!(
648 f,
649 "InjectSourceOffsets: {} ({} splits)",
650 source_id,
651 split_offsets.len()
652 ),
653 }
654 }
655}
656
657impl Command {
658 pub fn pause() -> Self {
659 Self::Pause
660 }
661
662 pub fn resume() -> Self {
663 Self::Resume
664 }
665
666 pub fn need_checkpoint(&self) -> bool {
667 !matches!(self, Command::Resume)
669 }
670}
671
672#[derive(Debug)]
673pub enum PostCollectCommand {
674 Command(String),
675 DropStreamingJobs {
676 streaming_job_ids: HashSet<JobId>,
677 unregistered_state_table_ids: HashSet<TableId>,
678 },
679 CreateStreamingJob {
680 info: CreateStreamingJobCommandInfo,
681 job_type: CreateStreamingJobType,
682 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
683 },
684 Reschedule {
685 reschedules: HashMap<FragmentId, Reschedule>,
686 },
687 ReplaceStreamJob(ReplaceStreamJobPlan),
688 SourceChangeSplit {
689 split_assignment: SplitAssignment,
690 },
691 CreateSubscription {
692 subscription_id: SubscriptionId,
693 },
694 ConnectorPropsChange(ConnectorPropsChange),
695 ResumeBackfill {
696 target: ResumeBackfillTarget,
697 },
698}
699
700impl PostCollectCommand {
701 pub fn barrier() -> Self {
702 PostCollectCommand::Command("barrier".to_owned())
703 }
704
705 pub fn command_name(&self) -> &str {
706 match self {
707 PostCollectCommand::Command(name) => name.as_str(),
708 PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
709 PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
710 PostCollectCommand::Reschedule { .. } => "Reschedule",
711 PostCollectCommand::ReplaceStreamJob(_) => "ReplaceStreamJob",
712 PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
713 PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
714 PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
715 PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
716 }
717 }
718}
719
720impl Display for PostCollectCommand {
721 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
722 f.write_str(self.command_name())
723 }
724}
725
726#[derive(Debug, Clone)]
727pub enum BarrierKind {
728 Initial,
729 Barrier,
730 Checkpoint(Vec<u64>),
732}
733
734impl BarrierKind {
735 pub fn to_protobuf(&self) -> PbBarrierKind {
736 match self {
737 BarrierKind::Initial => PbBarrierKind::Initial,
738 BarrierKind::Barrier => PbBarrierKind::Barrier,
739 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
740 }
741 }
742
743 pub fn is_checkpoint(&self) -> bool {
744 matches!(self, BarrierKind::Checkpoint(_))
745 }
746
747 pub fn is_initial(&self) -> bool {
748 matches!(self, BarrierKind::Initial)
749 }
750
751 pub fn as_str_name(&self) -> &'static str {
752 match self {
753 BarrierKind::Initial => "Initial",
754 BarrierKind::Barrier => "Barrier",
755 BarrierKind::Checkpoint(_) => "Checkpoint",
756 }
757 }
758}
759
760pub(super) struct CommandContext {
763 mv_subscription_max_retention: HashMap<TableId, u64>,
764
765 pub(super) barrier_info: BarrierInfo,
766
767 pub(super) table_ids_to_commit: HashSet<TableId>,
768
769 pub(super) command: PostCollectCommand,
770
771 _span: tracing::Span,
777}
778
779impl std::fmt::Debug for CommandContext {
780 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
781 f.debug_struct("CommandContext")
782 .field("barrier_info", &self.barrier_info)
783 .field("command", &self.command.command_name())
784 .finish()
785 }
786}
787
788impl CommandContext {
789 pub(super) fn new(
790 barrier_info: BarrierInfo,
791 mv_subscription_max_retention: HashMap<TableId, u64>,
792 table_ids_to_commit: HashSet<TableId>,
793 command: PostCollectCommand,
794 span: tracing::Span,
795 ) -> Self {
796 Self {
797 mv_subscription_max_retention,
798 barrier_info,
799 table_ids_to_commit,
800 command,
801 _span: span,
802 }
803 }
804
805 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
806 let Some(truncate_timestamptz) = Timestamptz::from_secs(
807 self.barrier_info
808 .prev_epoch
809 .value()
810 .as_timestamptz()
811 .timestamp()
812 - retention_second as i64,
813 ) else {
814 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
815 return self.barrier_info.prev_epoch.value();
816 };
817 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
818 }
819
820 pub(super) fn collect_commit_epoch_info(
821 &self,
822 info: &mut CommitEpochInfo,
823 resps: Vec<BarrierCompleteResponse>,
824 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
825 ) {
826 let (
827 sst_to_context,
828 synced_ssts,
829 new_table_watermarks,
830 old_value_ssts,
831 vector_index_adds,
832 truncate_tables,
833 ) = collect_resp_info(resps);
834
835 let new_table_fragment_infos =
836 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
837 assert!(!matches!(
838 job_type,
839 CreateStreamingJobType::SnapshotBackfill(_)
840 ));
841 let table_fragments = &info.stream_job_fragments;
842 let mut table_ids: HashSet<_> =
843 table_fragments.internal_table_ids().into_iter().collect();
844 if let Some(mv_table_id) = table_fragments.mv_table_id() {
845 table_ids.insert(mv_table_id);
846 }
847
848 vec![NewTableFragmentInfo { table_ids }]
849 } else {
850 vec![]
851 };
852
853 let mut mv_log_store_truncate_epoch = HashMap::new();
854 let mut update_truncate_epoch =
856 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
857 Entry::Occupied(mut entry) => {
858 let prev_truncate_epoch = entry.get_mut();
859 if truncate_epoch < *prev_truncate_epoch {
860 *prev_truncate_epoch = truncate_epoch;
861 }
862 }
863 Entry::Vacant(entry) => {
864 entry.insert(truncate_epoch);
865 }
866 };
867 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
868 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
869 update_truncate_epoch(*mv_table_id, truncate_epoch);
870 }
871 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
872 for mv_table_id in upstream_mv_table_ids {
873 update_truncate_epoch(mv_table_id, backfill_epoch);
874 }
875 }
876
877 let table_new_change_log = build_table_change_log_delta(
878 old_value_ssts.into_iter(),
879 synced_ssts.iter().map(|sst| &sst.sst_info),
880 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
881 mv_log_store_truncate_epoch.into_iter(),
882 );
883
884 let epoch = self.barrier_info.prev_epoch();
885 for table_id in &self.table_ids_to_commit {
886 info.tables_to_commit
887 .try_insert(*table_id, epoch)
888 .expect("non duplicate");
889 }
890
891 info.sstables.extend(synced_ssts);
892 info.new_table_watermarks.extend(new_table_watermarks);
893 info.sst_to_context.extend(sst_to_context);
894 info.new_table_fragment_infos
895 .extend(new_table_fragment_infos);
896 info.change_log_delta.extend(table_new_change_log);
897 for (table_id, vector_index_adds) in vector_index_adds {
898 info.vector_index_delta
899 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
900 .expect("non-duplicate");
901 }
902 if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
903 && let Some(index_table) = collect_new_vector_index_info(job_info)
904 {
905 info.vector_index_delta
906 .try_insert(
907 index_table.id,
908 VectorIndexDelta::Init(PbVectorIndexInit {
909 info: Some(index_table.vector_index_info.unwrap()),
910 }),
911 )
912 .expect("non-duplicate");
913 }
914 info.truncate_tables.extend(truncate_tables);
915 }
916}
917
918impl Command {
919 pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
921 {
922 {
923 if !is_currently_paused {
926 Some(Mutation::Pause(PauseMutation {}))
927 } else {
928 None
929 }
930 }
931 }
932 }
933
934 pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
936 {
937 {
938 if is_currently_paused {
940 Some(Mutation::Resume(ResumeMutation {}))
941 } else {
942 None
943 }
944 }
945 }
946 }
947
948 pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
950 {
951 {
952 let mut diff = HashMap::new();
953
954 for actor_splits in split_assignment.values() {
955 diff.extend(actor_splits.clone());
956 }
957
958 Mutation::Splits(SourceChangeSplitMutation {
959 actor_splits: build_actor_connector_splits(&diff),
960 })
961 }
962 }
963 }
964
965 pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
967 {
968 {
969 let config = config.clone();
970 Mutation::Throttle(ThrottleMutation {
971 fragment_throttle: config,
972 })
973 }
974 }
975 }
976
977 pub(super) fn drop_streaming_jobs_to_mutation(
979 actors: &Vec<ActorId>,
980 dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
981 ) -> Mutation {
982 {
983 Mutation::Stop(StopMutation {
984 actors: actors.clone(),
985 dropped_sink_fragments: dropped_sink_fragment_by_targets
986 .values()
987 .flatten()
988 .cloned()
989 .collect(),
990 })
991 }
992 }
993
994 pub(super) fn create_streaming_job_to_mutation(
996 info: &CreateStreamingJobCommandInfo,
997 job_type: &CreateStreamingJobType,
998 is_currently_paused: bool,
999 edges: &mut FragmentEdgeBuildResult,
1000 control_stream_manager: &ControlStreamManager,
1001 actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
1002 ) -> MetaResult<Mutation> {
1003 {
1004 {
1005 let CreateStreamingJobCommandInfo {
1006 stream_job_fragments,
1007 init_split_assignment: split_assignment,
1008 upstream_fragment_downstreams,
1009 fragment_backfill_ordering,
1010 streaming_job,
1011 ..
1012 } = info;
1013 let database_id = streaming_job.database_id();
1014 let added_actors = stream_job_fragments.actor_ids().collect();
1015 let actor_splits = split_assignment
1016 .values()
1017 .flat_map(build_actor_connector_splits)
1018 .collect();
1019 let subscriptions_to_add =
1020 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1021 job_type
1022 {
1023 snapshot_backfill_info
1024 .upstream_mv_table_id_to_backfill_epoch
1025 .keys()
1026 .map(|table_id| SubscriptionUpstreamInfo {
1027 subscriber_id: stream_job_fragments
1028 .stream_job_id()
1029 .as_subscriber_id(),
1030 upstream_mv_table_id: *table_id,
1031 })
1032 .collect()
1033 } else {
1034 Default::default()
1035 };
1036 let backfill_nodes_to_pause: Vec<_> =
1037 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1038 .into_iter()
1039 .collect();
1040
1041 let new_upstream_sinks =
1042 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1043 sink_fragment_id,
1044 sink_output_fields,
1045 project_exprs,
1046 new_sink_downstream,
1047 ..
1048 }) = job_type
1049 {
1050 let new_sink_actors = stream_job_fragments
1051 .actors_to_create()
1052 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
1053 .exactly_one()
1054 .map(|(_, _, actors)| {
1055 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
1056 actor_id: actor.actor_id,
1057 host: Some(control_stream_manager.host_addr(worker_id)),
1058 partial_graph_id: to_partial_graph_id(database_id, None),
1059 })
1060 })
1061 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
1062 let new_upstream_sink = PbNewUpstreamSink {
1063 info: Some(PbUpstreamSinkInfo {
1064 upstream_fragment_id: *sink_fragment_id,
1065 sink_output_schema: sink_output_fields.clone(),
1066 project_exprs: project_exprs.clone(),
1067 }),
1068 upstream_actors: new_sink_actors.collect(),
1069 };
1070 HashMap::from([(
1071 new_sink_downstream.downstream_fragment_id,
1072 new_upstream_sink,
1073 )])
1074 } else {
1075 HashMap::new()
1076 };
1077
1078 let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1079 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1080
1081 let add_mutation = AddMutation {
1082 actor_dispatchers: edges
1083 .dispatchers
1084 .extract_if(|fragment_id, _| {
1085 upstream_fragment_downstreams.contains_key(fragment_id)
1086 })
1087 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1088 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1089 .collect(),
1090 added_actors,
1091 actor_splits,
1092 pause: is_currently_paused,
1094 subscriptions_to_add,
1095 backfill_nodes_to_pause,
1096 actor_cdc_table_snapshot_splits,
1097 new_upstream_sinks,
1098 };
1099
1100 Ok(Mutation::Add(add_mutation))
1101 }
1102 }
1103 }
1104
1105 pub(super) fn replace_stream_job_to_mutation(
1107 ReplaceStreamJobPlan {
1108 old_fragments,
1109 replace_upstream,
1110 upstream_fragment_downstreams,
1111 init_split_assignment,
1112 auto_refresh_schema_sinks,
1113 ..
1114 }: &ReplaceStreamJobPlan,
1115 edges: &mut FragmentEdgeBuildResult,
1116 database_info: &mut InflightDatabaseInfo,
1117 ) -> MetaResult<Option<Mutation>> {
1118 {
1119 {
1120 let merge_updates = edges
1121 .merge_updates
1122 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1123 .collect();
1124 let dispatchers = edges
1125 .dispatchers
1126 .extract_if(|fragment_id, _| {
1127 upstream_fragment_downstreams.contains_key(fragment_id)
1128 })
1129 .collect();
1130 let actor_cdc_table_snapshot_splits = database_info
1131 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1132 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1133 Ok(Self::generate_update_mutation_for_replace_table(
1134 old_fragments.actor_ids().chain(
1135 auto_refresh_schema_sinks
1136 .as_ref()
1137 .into_iter()
1138 .flat_map(|sinks| {
1139 sinks.iter().flat_map(|sink| {
1140 sink.original_fragment
1141 .actors
1142 .iter()
1143 .map(|actor| actor.actor_id)
1144 })
1145 }),
1146 ),
1147 merge_updates,
1148 dispatchers,
1149 init_split_assignment,
1150 actor_cdc_table_snapshot_splits,
1151 auto_refresh_schema_sinks.as_ref(),
1152 ))
1153 }
1154 }
1155 }
1156
1157 pub(super) fn reschedule_to_mutation(
1159 reschedules: &HashMap<FragmentId, Reschedule>,
1160 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1161 control_stream_manager: &ControlStreamManager,
1162 database_info: &mut InflightDatabaseInfo,
1163 ) -> MetaResult<Option<Mutation>> {
1164 {
1165 {
1166 let database_id = database_info.database_id;
1167 let mut dispatcher_update = HashMap::new();
1168 for reschedule in reschedules.values() {
1169 for &(upstream_fragment_id, dispatcher_id) in
1170 &reschedule.upstream_fragment_dispatcher_ids
1171 {
1172 let upstream_actor_ids = fragment_actors
1174 .get(&upstream_fragment_id)
1175 .expect("should contain");
1176
1177 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1178
1179 for &actor_id in upstream_actor_ids {
1181 let added_downstream_actor_id = if upstream_reschedule
1182 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1183 .unwrap_or(true)
1184 {
1185 reschedule
1186 .added_actors
1187 .values()
1188 .flatten()
1189 .cloned()
1190 .collect()
1191 } else {
1192 Default::default()
1193 };
1194 dispatcher_update
1196 .try_insert(
1197 (actor_id, dispatcher_id),
1198 DispatcherUpdate {
1199 actor_id,
1200 dispatcher_id,
1201 hash_mapping: reschedule
1202 .upstream_dispatcher_mapping
1203 .as_ref()
1204 .map(|m| m.to_protobuf()),
1205 added_downstream_actor_id,
1206 removed_downstream_actor_id: reschedule
1207 .removed_actors
1208 .iter()
1209 .cloned()
1210 .collect(),
1211 },
1212 )
1213 .unwrap();
1214 }
1215 }
1216 }
1217 let dispatcher_update = dispatcher_update.into_values().collect();
1218
1219 let mut merge_update = HashMap::new();
1220 for (&fragment_id, reschedule) in reschedules {
1221 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1222 let downstream_actor_ids = fragment_actors
1224 .get(&downstream_fragment_id)
1225 .expect("should contain");
1226
1227 let downstream_removed_actors: HashSet<_> = reschedules
1231 .get(&downstream_fragment_id)
1232 .map(|downstream_reschedule| {
1233 downstream_reschedule
1234 .removed_actors
1235 .iter()
1236 .copied()
1237 .collect()
1238 })
1239 .unwrap_or_default();
1240
1241 for &actor_id in downstream_actor_ids {
1243 if downstream_removed_actors.contains(&actor_id) {
1244 continue;
1245 }
1246
1247 merge_update
1249 .try_insert(
1250 (actor_id, fragment_id),
1251 MergeUpdate {
1252 actor_id,
1253 upstream_fragment_id: fragment_id,
1254 new_upstream_fragment_id: None,
1255 added_upstream_actors: reschedule
1256 .added_actors
1257 .iter()
1258 .flat_map(|(worker_id, actors)| {
1259 let host =
1260 control_stream_manager.host_addr(*worker_id);
1261 actors.iter().map(move |&actor_id| PbActorInfo {
1262 actor_id,
1263 host: Some(host.clone()),
1264 partial_graph_id: to_partial_graph_id(
1266 database_id,
1267 None,
1268 ),
1269 })
1270 })
1271 .collect(),
1272 removed_upstream_actor_id: reschedule
1273 .removed_actors
1274 .iter()
1275 .cloned()
1276 .collect(),
1277 },
1278 )
1279 .unwrap();
1280 }
1281 }
1282 }
1283 let merge_update = merge_update.into_values().collect();
1284
1285 let mut actor_vnode_bitmap_update = HashMap::new();
1286 for reschedule in reschedules.values() {
1287 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1289 let bitmap = bitmap.to_protobuf();
1290 actor_vnode_bitmap_update
1291 .try_insert(actor_id, bitmap)
1292 .unwrap();
1293 }
1294 }
1295 let dropped_actors = reschedules
1296 .values()
1297 .flat_map(|r| r.removed_actors.iter().copied())
1298 .collect();
1299 let mut actor_splits = HashMap::new();
1300 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1301 for (fragment_id, reschedule) in reschedules {
1302 for (actor_id, splits) in &reschedule.actor_splits {
1303 actor_splits.insert(
1304 *actor_id,
1305 ConnectorSplits {
1306 splits: splits.iter().map(ConnectorSplit::from).collect(),
1307 },
1308 );
1309 }
1310
1311 if let Some(assignment) =
1312 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1313 {
1314 actor_cdc_table_snapshot_splits.extend(assignment)
1315 }
1316 }
1317
1318 let actor_new_dispatchers = HashMap::new();
1320 let mutation = Mutation::Update(UpdateMutation {
1321 dispatcher_update,
1322 merge_update,
1323 actor_vnode_bitmap_update,
1324 dropped_actors,
1325 actor_splits,
1326 actor_new_dispatchers,
1327 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1328 splits: actor_cdc_table_snapshot_splits,
1329 }),
1330 sink_schema_change: Default::default(),
1331 subscriptions_to_drop: vec![],
1332 });
1333 tracing::debug!("update mutation: {mutation:?}");
1334 Ok(Some(mutation))
1335 }
1336 }
1337 }
1338
1339 pub(super) fn create_subscription_to_mutation(
1341 upstream_mv_table_id: TableId,
1342 subscription_id: SubscriptionId,
1343 ) -> Mutation {
1344 {
1345 Mutation::Add(AddMutation {
1346 actor_dispatchers: Default::default(),
1347 added_actors: vec![],
1348 actor_splits: Default::default(),
1349 pause: false,
1350 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1351 upstream_mv_table_id,
1352 subscriber_id: subscription_id.as_subscriber_id(),
1353 }],
1354 backfill_nodes_to_pause: vec![],
1355 actor_cdc_table_snapshot_splits: None,
1356 new_upstream_sinks: Default::default(),
1357 })
1358 }
1359 }
1360
1361 pub(super) fn drop_subscription_to_mutation(
1363 upstream_mv_table_id: TableId,
1364 subscription_id: SubscriptionId,
1365 ) -> Mutation {
1366 {
1367 Mutation::DropSubscriptions(DropSubscriptionsMutation {
1368 info: vec![SubscriptionUpstreamInfo {
1369 subscriber_id: subscription_id.as_subscriber_id(),
1370 upstream_mv_table_id,
1371 }],
1372 })
1373 }
1374 }
1375
1376 pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1378 {
1379 {
1380 let mut connector_props_infos = HashMap::default();
1381 for (k, v) in config {
1382 connector_props_infos.insert(
1383 k.as_raw_id(),
1384 ConnectorPropsInfo {
1385 connector_props_info: v.clone(),
1386 },
1387 );
1388 }
1389 Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1390 connector_props_infos,
1391 })
1392 }
1393 }
1394 }
1395
1396 pub(super) fn refresh_to_mutation(
1398 table_id: TableId,
1399 associated_source_id: SourceId,
1400 ) -> Mutation {
1401 Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1402 table_id,
1403 associated_source_id,
1404 })
1405 }
1406
1407 pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1409 Mutation::ListFinish(ListFinishMutation {
1410 associated_source_id,
1411 })
1412 }
1413
1414 pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1416 Mutation::LoadFinish(LoadFinishMutation {
1417 associated_source_id,
1418 })
1419 }
1420
1421 pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1423 Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1424 source_id: source_id.as_raw_id(),
1425 })
1426 }
1427
1428 pub(super) fn resume_backfill_to_mutation(
1430 target: &ResumeBackfillTarget,
1431 database_info: &InflightDatabaseInfo,
1432 ) -> MetaResult<Option<Mutation>> {
1433 {
1434 {
1435 let fragment_ids: HashSet<_> = match target {
1436 ResumeBackfillTarget::Job(job_id) => {
1437 database_info.backfill_fragment_ids_for_job(*job_id)?
1438 }
1439 ResumeBackfillTarget::Fragment(fragment_id) => {
1440 if !database_info.is_backfill_fragment(*fragment_id)? {
1441 return Err(MetaError::invalid_parameter(format!(
1442 "fragment {} is not a backfill node",
1443 fragment_id
1444 )));
1445 }
1446 HashSet::from([*fragment_id])
1447 }
1448 };
1449 if fragment_ids.is_empty() {
1450 warn!(
1451 ?target,
1452 "resume backfill command ignored because no backfill fragments found"
1453 );
1454 Ok(None)
1455 } else {
1456 Ok(Some(Mutation::StartFragmentBackfill(
1457 StartFragmentBackfillMutation {
1458 fragment_ids: fragment_ids.into_iter().collect(),
1459 },
1460 )))
1461 }
1462 }
1463 }
1464 }
1465
1466 pub(super) fn inject_source_offsets_to_mutation(
1468 source_id: SourceId,
1469 split_offsets: &HashMap<String, String>,
1470 ) -> Mutation {
1471 Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1472 source_id: source_id.as_raw_id(),
1473 split_offsets: split_offsets.clone(),
1474 })
1475 }
1476
1477 pub(super) fn create_streaming_job_actors_to_create(
1479 info: &CreateStreamingJobCommandInfo,
1480 edges: &mut FragmentEdgeBuildResult,
1481 ) -> StreamJobActorsToCreate {
1482 {
1483 {
1484 let actors_to_create = info.stream_job_fragments.actors_to_create();
1485 edges.collect_actors_to_create(actors_to_create.map(
1486 |(fragment_id, node, actors)| {
1487 (
1488 fragment_id,
1489 node,
1490 actors,
1491 [], )
1493 },
1494 ))
1495 }
1496 }
1497 }
1498
1499 pub(super) fn reschedule_actors_to_create(
1501 reschedules: &HashMap<FragmentId, Reschedule>,
1502 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1503 database_info: &InflightDatabaseInfo,
1504 control_stream_manager: &ControlStreamManager,
1505 ) -> StreamJobActorsToCreate {
1506 {
1507 {
1508 let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1509 reschedules.iter().map(|(fragment_id, reschedule)| {
1510 (
1511 *fragment_id,
1512 reschedule.newly_created_actors.values().map(
1513 |((actor, dispatchers), _)| {
1514 (actor.actor_id, dispatchers.as_slice())
1515 },
1516 ),
1517 )
1518 }),
1519 Some((reschedules, fragment_actors)),
1520 database_info,
1521 control_stream_manager,
1522 );
1523 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1524 for (fragment_id, (actor, dispatchers), worker_id) in
1525 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1526 reschedule
1527 .newly_created_actors
1528 .values()
1529 .map(|(actors, status)| (*fragment_id, actors, status))
1530 })
1531 {
1532 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1533 map.entry(*worker_id)
1534 .or_default()
1535 .entry(fragment_id)
1536 .or_insert_with(|| {
1537 let node = database_info.fragment(fragment_id).nodes.clone();
1538 let subscribers =
1539 database_info.fragment_subscribers(fragment_id).collect();
1540 (node, vec![], subscribers)
1541 })
1542 .1
1543 .push((actor.clone(), upstreams, dispatchers.clone()));
1544 }
1545 map
1546 }
1547 }
1548 }
1549
1550 pub(super) fn replace_stream_job_actors_to_create(
1552 replace_table: &ReplaceStreamJobPlan,
1553 edges: &mut FragmentEdgeBuildResult,
1554 database_info: &InflightDatabaseInfo,
1555 ) -> StreamJobActorsToCreate {
1556 {
1557 {
1558 let mut actors = edges.collect_actors_to_create(
1559 replace_table.new_fragments.actors_to_create().map(
1560 |(fragment_id, node, actors)| {
1561 (
1562 fragment_id,
1563 node,
1564 actors,
1565 database_info
1566 .job_subscribers(replace_table.old_fragments.stream_job_id),
1567 )
1568 },
1569 ),
1570 );
1571 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1572 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1573 (
1574 sink.new_fragment.fragment_id,
1575 &sink.new_fragment.nodes,
1576 sink.new_fragment.actors.iter().map(|actor| {
1577 (
1578 actor,
1579 sink.actor_status[&actor.actor_id]
1580 .location
1581 .as_ref()
1582 .unwrap()
1583 .worker_node_id,
1584 )
1585 }),
1586 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1587 )
1588 }));
1589 for (worker_id, fragment_actors) in sink_actors {
1590 actors.entry(worker_id).or_default().extend(fragment_actors);
1591 }
1592 }
1593 actors
1594 }
1595 }
1596 }
1597
1598 fn generate_update_mutation_for_replace_table(
1599 dropped_actors: impl IntoIterator<Item = ActorId>,
1600 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1601 dispatchers: FragmentActorDispatchers,
1602 init_split_assignment: &SplitAssignment,
1603 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1604 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1605 ) -> Option<Mutation> {
1606 let dropped_actors = dropped_actors.into_iter().collect();
1607
1608 let actor_new_dispatchers = dispatchers
1609 .into_values()
1610 .flatten()
1611 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1612 .collect();
1613
1614 let actor_splits = init_split_assignment
1615 .values()
1616 .flat_map(build_actor_connector_splits)
1617 .collect();
1618 Some(Mutation::Update(UpdateMutation {
1619 actor_new_dispatchers,
1620 merge_update: merge_updates.into_values().flatten().collect(),
1621 dropped_actors,
1622 actor_splits,
1623 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1624 sink_schema_change: auto_refresh_schema_sinks
1625 .as_ref()
1626 .into_iter()
1627 .flat_map(|sinks| {
1628 sinks.iter().map(|sink| {
1629 (
1630 sink.original_sink.id.as_raw_id(),
1631 PbSinkSchemaChange {
1632 original_schema: sink
1633 .original_sink
1634 .columns
1635 .iter()
1636 .map(|col| PbField {
1637 data_type: Some(
1638 col.column_desc
1639 .as_ref()
1640 .unwrap()
1641 .column_type
1642 .as_ref()
1643 .unwrap()
1644 .clone(),
1645 ),
1646 name: col.column_desc.as_ref().unwrap().name.clone(),
1647 })
1648 .collect(),
1649 op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1650 fields: sink
1651 .newly_add_fields
1652 .iter()
1653 .map(|field| field.to_prost())
1654 .collect(),
1655 })),
1656 },
1657 )
1658 })
1659 })
1660 .collect(),
1661 ..Default::default()
1662 }))
1663 }
1664}
1665
1666impl Command {
1667 #[expect(clippy::type_complexity)]
1668 pub(super) fn collect_database_partial_graph_actor_upstreams(
1669 actor_dispatchers: impl Iterator<
1670 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1671 >,
1672 reschedule_dispatcher_update: Option<(
1673 &HashMap<FragmentId, Reschedule>,
1674 &HashMap<FragmentId, HashSet<ActorId>>,
1675 )>,
1676 database_info: &InflightDatabaseInfo,
1677 control_stream_manager: &ControlStreamManager,
1678 ) -> HashMap<ActorId, ActorUpstreams> {
1679 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1680 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1681 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1682 for (upstream_actor_id, dispatchers) in upstream_actors {
1683 let upstream_actor_location =
1684 upstream_fragment.actors[&upstream_actor_id].worker_id;
1685 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1686 for downstream_actor_id in dispatchers
1687 .iter()
1688 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1689 {
1690 actor_upstreams
1691 .entry(*downstream_actor_id)
1692 .or_default()
1693 .entry(upstream_fragment_id)
1694 .or_default()
1695 .insert(
1696 upstream_actor_id,
1697 PbActorInfo {
1698 actor_id: upstream_actor_id,
1699 host: Some(upstream_actor_host.clone()),
1700 partial_graph_id: to_partial_graph_id(
1701 database_info.database_id,
1702 None,
1703 ),
1704 },
1705 );
1706 }
1707 }
1708 }
1709 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1710 for reschedule in reschedules.values() {
1711 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1712 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1713 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1714 for upstream_actor_id in fragment_actors
1715 .get(upstream_fragment_id)
1716 .expect("should exist")
1717 {
1718 let upstream_actor_location =
1719 upstream_fragment.actors[upstream_actor_id].worker_id;
1720 let upstream_actor_host =
1721 control_stream_manager.host_addr(upstream_actor_location);
1722 if let Some(upstream_reschedule) = upstream_reschedule
1723 && upstream_reschedule
1724 .removed_actors
1725 .contains(upstream_actor_id)
1726 {
1727 continue;
1728 }
1729 for (_, downstream_actor_id) in
1730 reschedule
1731 .added_actors
1732 .iter()
1733 .flat_map(|(worker_id, actors)| {
1734 actors.iter().map(|actor| (*worker_id, *actor))
1735 })
1736 {
1737 actor_upstreams
1738 .entry(downstream_actor_id)
1739 .or_default()
1740 .entry(*upstream_fragment_id)
1741 .or_default()
1742 .insert(
1743 *upstream_actor_id,
1744 PbActorInfo {
1745 actor_id: *upstream_actor_id,
1746 host: Some(upstream_actor_host.clone()),
1747 partial_graph_id: to_partial_graph_id(
1748 database_info.database_id,
1749 None,
1750 ),
1751 },
1752 );
1753 }
1754 }
1755 }
1756 }
1757 }
1758 actor_upstreams
1759 }
1760}