1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37 PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
49 PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
50 StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
51};
52use risingwave_pb::stream_service::BarrierCompleteResponse;
53use tracing::warn;
54
55use super::info::InflightDatabaseInfo;
56use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
57use crate::barrier::edge_builder::FragmentEdgeBuildResult;
58use crate::barrier::info::BarrierInfo;
59use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
60use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
61use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
62use crate::controller::scale::LoadedFragmentContext;
63use crate::controller::utils::StreamingJobExtraInfo;
64use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
65use crate::manager::{StreamingJob, StreamingJobType};
66use crate::model::{
67 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
68 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
69 StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
70};
71use crate::stream::{
72 AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
73 ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
74 build_actor_connector_splits,
75};
76use crate::{MetaError, MetaResult};
77
78#[derive(Debug, Clone)]
81pub struct Reschedule {
82 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
84
85 pub removed_actors: HashSet<ActorId>,
87
88 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
90
91 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
93 pub upstream_dispatcher_mapping: Option<ActorMapping>,
98
99 pub downstream_fragment_ids: Vec<FragmentId>,
101
102 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
106
107 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
108}
109
110#[derive(Debug, Clone)]
111pub struct ReschedulePlan {
112 pub reschedules: HashMap<FragmentId, Reschedule>,
113 pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
116}
117
118#[derive(Debug, Clone)]
120pub struct RescheduleContext {
121 pub loaded: LoadedFragmentContext,
122 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
123 pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
124 pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
125 pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
126}
127
128impl RescheduleContext {
129 pub fn empty() -> Self {
130 Self {
131 loaded: LoadedFragmentContext::default(),
132 job_extra_info: HashMap::new(),
133 upstream_fragments: HashMap::new(),
134 downstream_fragments: HashMap::new(),
135 downstream_relations: HashMap::new(),
136 }
137 }
138
139 pub fn is_empty(&self) -> bool {
140 self.loaded.is_empty()
141 }
142
143 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
144 let loaded = self.loaded.for_database(database_id)?;
145 let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
146 let fragment_ids: HashSet<FragmentId> = loaded
149 .job_fragments
150 .values()
151 .flat_map(|fragments| fragments.keys().copied())
152 .collect();
153
154 let job_extra_info = self
155 .job_extra_info
156 .iter()
157 .filter(|(job_id, _)| job_ids.contains(*job_id))
158 .map(|(job_id, info)| (*job_id, info.clone()))
159 .collect();
160
161 let upstream_fragments = self
162 .upstream_fragments
163 .iter()
164 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
165 .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
166 .collect();
167
168 let downstream_fragments = self
169 .downstream_fragments
170 .iter()
171 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
172 .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
173 .collect();
174
175 let downstream_relations = self
176 .downstream_relations
177 .iter()
178 .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
182 .map(|(key, relation)| (*key, relation.clone()))
183 .collect();
184
185 Some(Self {
186 loaded,
187 job_extra_info,
188 upstream_fragments,
189 downstream_fragments,
190 downstream_relations,
191 })
192 }
193
194 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
197 let Self {
198 loaded,
199 job_extra_info,
200 upstream_fragments,
201 downstream_fragments,
202 downstream_relations,
203 } = self;
204
205 let mut contexts: HashMap<_, _> = loaded
206 .into_database_contexts()
207 .into_iter()
208 .map(|(database_id, loaded)| {
209 (
210 database_id,
211 Self {
212 loaded,
213 job_extra_info: HashMap::new(),
214 upstream_fragments: HashMap::new(),
215 downstream_fragments: HashMap::new(),
216 downstream_relations: HashMap::new(),
217 },
218 )
219 })
220 .collect();
221
222 if contexts.is_empty() {
223 return contexts;
224 }
225
226 let mut job_databases = HashMap::new();
227 let mut fragment_databases = HashMap::new();
228 for (&database_id, context) in &contexts {
229 for job_id in context.loaded.job_map.keys().copied() {
230 job_databases.insert(job_id, database_id);
231 }
232 for fragment_id in context
233 .loaded
234 .job_fragments
235 .values()
236 .flat_map(|fragments| fragments.keys().copied())
237 {
238 fragment_databases.insert(fragment_id, database_id);
239 }
240 }
241
242 for (job_id, info) in job_extra_info {
243 if let Some(database_id) = job_databases.get(&job_id).copied() {
244 contexts
245 .get_mut(&database_id)
246 .expect("database context should exist for job")
247 .job_extra_info
248 .insert(job_id, info);
249 }
250 }
251
252 for (fragment_id, upstreams) in upstream_fragments {
253 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
254 contexts
255 .get_mut(&database_id)
256 .expect("database context should exist for fragment")
257 .upstream_fragments
258 .insert(fragment_id, upstreams);
259 }
260 }
261
262 for (fragment_id, downstreams) in downstream_fragments {
263 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
264 contexts
265 .get_mut(&database_id)
266 .expect("database context should exist for fragment")
267 .downstream_fragments
268 .insert(fragment_id, downstreams);
269 }
270 }
271
272 for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
273 if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
276 contexts
277 .get_mut(&database_id)
278 .expect("database context should exist for relation source")
279 .downstream_relations
280 .insert((source_fragment_id, target_fragment_id), relation);
281 }
282 }
283
284 contexts
285 }
286}
287
288#[derive(Debug, Clone)]
295pub struct ReplaceStreamJobPlan {
296 pub old_fragments: StreamJobFragments,
297 pub new_fragments: StreamJobFragmentsToCreate,
298 pub replace_upstream: FragmentReplaceUpstream,
301 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
302 pub split_plan: ReplaceJobSplitPlan,
305 pub streaming_job: StreamingJob,
307 pub tmp_id: JobId,
309 pub to_drop_state_table_ids: Vec<TableId>,
311 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
312}
313
314impl ReplaceStreamJobPlan {
315 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
317 let mut fragment_replacements = HashMap::new();
318 for (upstream_fragment_id, new_upstream_fragment_id) in
319 self.replace_upstream.values().flatten()
320 {
321 {
322 let r =
323 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
324 if let Some(r) = r {
325 assert_eq!(
326 *new_upstream_fragment_id, r,
327 "one fragment is replaced by multiple fragments"
328 );
329 }
330 }
331 }
332 fragment_replacements
333 }
334}
335
336#[derive(educe::Educe, Clone)]
337#[educe(Debug)]
338pub struct CreateStreamingJobCommandInfo {
339 #[educe(Debug(ignore))]
340 pub stream_job_fragments: StreamJobFragmentsToCreate,
341 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
342 pub init_split_assignment: SourceSplitAssignment,
344 pub definition: String,
345 pub job_type: StreamingJobType,
346 pub create_type: CreateType,
347 pub streaming_job: StreamingJob,
348 pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
349 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
350 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
351 pub is_serverless: bool,
352}
353
354impl StreamJobFragments {
355 pub(super) fn new_fragment_info<'a>(
357 &'a self,
358 assignment: &'a SplitAssignment,
359 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
360 self.fragments.values().map(|fragment| {
361 let mut fragment_splits = assignment
362 .get(&fragment.fragment_id)
363 .cloned()
364 .unwrap_or_default();
365
366 (
367 fragment.fragment_id,
368 InflightFragmentInfo {
369 fragment_id: fragment.fragment_id,
370 distribution_type: fragment.distribution_type.into(),
371 fragment_type_mask: fragment.fragment_type_mask,
372 vnode_count: fragment.vnode_count(),
373 nodes: fragment.nodes.clone(),
374 actors: fragment
375 .actors
376 .iter()
377 .map(|actor| {
378 (
379 actor.actor_id,
380 InflightActorInfo {
381 worker_id: self
382 .actor_status
383 .get(&actor.actor_id)
384 .expect("should exist")
385 .worker_id(),
386 vnode_bitmap: actor.vnode_bitmap.clone(),
387 splits: fragment_splits
388 .remove(&actor.actor_id)
389 .unwrap_or_default(),
390 },
391 )
392 })
393 .collect(),
394 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
395 },
396 )
397 })
398 }
399}
400
401#[derive(Debug, Clone)]
402pub struct SnapshotBackfillInfo {
403 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
407}
408
409#[derive(Debug, Clone)]
410pub enum CreateStreamingJobType {
411 Normal,
412 SinkIntoTable(UpstreamSinkInfo),
413 SnapshotBackfill(SnapshotBackfillInfo),
414}
415
416#[derive(Debug)]
421pub enum Command {
422 Flush,
425
426 Pause,
429
430 Resume,
434
435 DropStreamingJobs {
443 streaming_job_ids: HashSet<JobId>,
444 actors: Vec<ActorId>,
445 unregistered_state_table_ids: HashSet<TableId>,
446 unregistered_fragment_ids: HashSet<FragmentId>,
447 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
449 },
450
451 CreateStreamingJob {
461 info: CreateStreamingJobCommandInfo,
462 job_type: CreateStreamingJobType,
463 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
464 },
465
466 RescheduleIntent {
468 context: RescheduleContext,
469 reschedule_plan: Option<ReschedulePlan>,
474 },
475
476 ReplaceStreamJob(ReplaceStreamJobPlan),
483
484 SourceChangeSplit(SplitState),
487
488 Throttle {
491 jobs: HashSet<JobId>,
492 config: HashMap<FragmentId, ThrottleConfig>,
493 },
494
495 CreateSubscription {
498 subscription_id: SubscriptionId,
499 upstream_mv_table_id: TableId,
500 retention_second: u64,
501 },
502
503 DropSubscription {
507 subscription_id: SubscriptionId,
508 upstream_mv_table_id: TableId,
509 },
510
511 AlterSubscriptionRetention {
513 subscription_id: SubscriptionId,
514 upstream_mv_table_id: TableId,
515 retention_second: u64,
516 },
517
518 ConnectorPropsChange(ConnectorPropsChange),
519
520 Refresh {
523 table_id: TableId,
524 associated_source_id: SourceId,
525 },
526 ListFinish {
527 table_id: TableId,
528 associated_source_id: SourceId,
529 },
530 LoadFinish {
531 table_id: TableId,
532 associated_source_id: SourceId,
533 },
534
535 ResetSource {
538 source_id: SourceId,
539 },
540
541 ResumeBackfill {
544 target: ResumeBackfillTarget,
545 },
546
547 InjectSourceOffsets {
551 source_id: SourceId,
552 split_offsets: HashMap<String, String>,
554 },
555}
556
557#[derive(Debug, Clone, Copy)]
558pub enum ResumeBackfillTarget {
559 Job(JobId),
560 Fragment(FragmentId),
561}
562
563impl std::fmt::Display for Command {
565 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
566 match self {
567 Command::Flush => write!(f, "Flush"),
568 Command::Pause => write!(f, "Pause"),
569 Command::Resume => write!(f, "Resume"),
570 Command::DropStreamingJobs {
571 streaming_job_ids, ..
572 } => {
573 write!(
574 f,
575 "DropStreamingJobs: {}",
576 streaming_job_ids.iter().sorted().join(", ")
577 )
578 }
579 Command::CreateStreamingJob { info, .. } => {
580 write!(f, "CreateStreamingJob: {}", info.streaming_job)
581 }
582 Command::RescheduleIntent {
583 reschedule_plan, ..
584 } => {
585 if reschedule_plan.is_some() {
586 write!(f, "RescheduleIntent(planned)")
587 } else {
588 write!(f, "RescheduleIntent")
589 }
590 }
591 Command::ReplaceStreamJob(plan) => {
592 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
593 }
594 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
595 Command::Throttle { .. } => write!(f, "Throttle"),
596 Command::CreateSubscription {
597 subscription_id, ..
598 } => write!(f, "CreateSubscription: {subscription_id}"),
599 Command::DropSubscription {
600 subscription_id, ..
601 } => write!(f, "DropSubscription: {subscription_id}"),
602 Command::AlterSubscriptionRetention {
603 subscription_id,
604 retention_second,
605 ..
606 } => write!(
607 f,
608 "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
609 ),
610 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
611 Command::Refresh {
612 table_id,
613 associated_source_id,
614 } => write!(
615 f,
616 "Refresh: {} (source: {})",
617 table_id, associated_source_id
618 ),
619 Command::ListFinish {
620 table_id,
621 associated_source_id,
622 } => write!(
623 f,
624 "ListFinish: {} (source: {})",
625 table_id, associated_source_id
626 ),
627 Command::LoadFinish {
628 table_id,
629 associated_source_id,
630 } => write!(
631 f,
632 "LoadFinish: {} (source: {})",
633 table_id, associated_source_id
634 ),
635 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
636 Command::ResumeBackfill { target } => match target {
637 ResumeBackfillTarget::Job(job_id) => {
638 write!(f, "ResumeBackfill: job={job_id}")
639 }
640 ResumeBackfillTarget::Fragment(fragment_id) => {
641 write!(f, "ResumeBackfill: fragment={fragment_id}")
642 }
643 },
644 Command::InjectSourceOffsets {
645 source_id,
646 split_offsets,
647 } => write!(
648 f,
649 "InjectSourceOffsets: {} ({} splits)",
650 source_id,
651 split_offsets.len()
652 ),
653 }
654 }
655}
656
657impl Command {
658 pub fn pause() -> Self {
659 Self::Pause
660 }
661
662 pub fn resume() -> Self {
663 Self::Resume
664 }
665
666 pub fn need_checkpoint(&self) -> bool {
667 !matches!(self, Command::Resume)
669 }
670}
671
672#[derive(Debug)]
673pub enum PostCollectCommand {
674 Command(String),
675 DropStreamingJobs {
676 streaming_job_ids: HashSet<JobId>,
677 unregistered_state_table_ids: HashSet<TableId>,
678 },
679 CreateStreamingJob {
680 info: CreateStreamingJobCommandInfo,
681 job_type: CreateStreamingJobType,
682 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
683 resolved_split_assignment: SplitAssignment,
684 },
685 Reschedule {
686 reschedules: HashMap<FragmentId, Reschedule>,
687 },
688 ReplaceStreamJob {
689 plan: ReplaceStreamJobPlan,
690 resolved_split_assignment: SplitAssignment,
691 },
692 SourceChangeSplit {
693 split_assignment: SplitAssignment,
694 },
695 CreateSubscription {
696 subscription_id: SubscriptionId,
697 },
698 ConnectorPropsChange(ConnectorPropsChange),
699 ResumeBackfill {
700 target: ResumeBackfillTarget,
701 },
702}
703
704impl PostCollectCommand {
705 pub fn barrier() -> Self {
706 PostCollectCommand::Command("barrier".to_owned())
707 }
708
709 pub fn command_name(&self) -> &str {
710 match self {
711 PostCollectCommand::Command(name) => name.as_str(),
712 PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
713 PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
714 PostCollectCommand::Reschedule { .. } => "Reschedule",
715 PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
716 PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
717 PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
718 PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
719 PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
720 }
721 }
722}
723
724impl Display for PostCollectCommand {
725 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726 f.write_str(self.command_name())
727 }
728}
729
730#[derive(Debug, Clone)]
731pub enum BarrierKind {
732 Initial,
733 Barrier,
734 Checkpoint(Vec<u64>),
736}
737
738impl BarrierKind {
739 pub fn to_protobuf(&self) -> PbBarrierKind {
740 match self {
741 BarrierKind::Initial => PbBarrierKind::Initial,
742 BarrierKind::Barrier => PbBarrierKind::Barrier,
743 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
744 }
745 }
746
747 pub fn is_checkpoint(&self) -> bool {
748 matches!(self, BarrierKind::Checkpoint(_))
749 }
750
751 pub fn is_initial(&self) -> bool {
752 matches!(self, BarrierKind::Initial)
753 }
754
755 pub fn as_str_name(&self) -> &'static str {
756 match self {
757 BarrierKind::Initial => "Initial",
758 BarrierKind::Barrier => "Barrier",
759 BarrierKind::Checkpoint(_) => "Checkpoint",
760 }
761 }
762}
763
764pub(super) struct CommandContext {
767 mv_subscription_max_retention: HashMap<TableId, u64>,
768
769 pub(super) barrier_info: BarrierInfo,
770
771 pub(super) table_ids_to_commit: HashSet<TableId>,
772
773 pub(super) command: PostCollectCommand,
774
775 _span: tracing::Span,
781}
782
783impl std::fmt::Debug for CommandContext {
784 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
785 f.debug_struct("CommandContext")
786 .field("barrier_info", &self.barrier_info)
787 .field("command", &self.command.command_name())
788 .finish()
789 }
790}
791
792impl CommandContext {
793 pub(super) fn new(
794 barrier_info: BarrierInfo,
795 mv_subscription_max_retention: HashMap<TableId, u64>,
796 table_ids_to_commit: HashSet<TableId>,
797 command: PostCollectCommand,
798 span: tracing::Span,
799 ) -> Self {
800 Self {
801 mv_subscription_max_retention,
802 barrier_info,
803 table_ids_to_commit,
804 command,
805 _span: span,
806 }
807 }
808
809 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
810 let Some(truncate_timestamptz) = Timestamptz::from_secs(
811 self.barrier_info
812 .prev_epoch
813 .value()
814 .as_timestamptz()
815 .timestamp()
816 - retention_second as i64,
817 ) else {
818 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
819 return self.barrier_info.prev_epoch.value();
820 };
821 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
822 }
823
824 pub(super) fn collect_commit_epoch_info(
825 &self,
826 info: &mut CommitEpochInfo,
827 resps: Vec<BarrierCompleteResponse>,
828 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
829 ) {
830 let (
831 sst_to_context,
832 synced_ssts,
833 new_table_watermarks,
834 old_value_ssts,
835 vector_index_adds,
836 truncate_tables,
837 ) = collect_resp_info(resps);
838
839 let new_table_fragment_infos =
840 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
841 assert!(!matches!(
842 job_type,
843 CreateStreamingJobType::SnapshotBackfill(_)
844 ));
845 let table_fragments = &info.stream_job_fragments;
846 let mut table_ids: HashSet<_> =
847 table_fragments.internal_table_ids().into_iter().collect();
848 if let Some(mv_table_id) = table_fragments.mv_table_id() {
849 table_ids.insert(mv_table_id);
850 }
851
852 vec![NewTableFragmentInfo { table_ids }]
853 } else {
854 vec![]
855 };
856
857 let mut mv_log_store_truncate_epoch = HashMap::new();
858 let mut update_truncate_epoch =
860 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
861 Entry::Occupied(mut entry) => {
862 let prev_truncate_epoch = entry.get_mut();
863 if truncate_epoch < *prev_truncate_epoch {
864 *prev_truncate_epoch = truncate_epoch;
865 }
866 }
867 Entry::Vacant(entry) => {
868 entry.insert(truncate_epoch);
869 }
870 };
871 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
872 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
873 update_truncate_epoch(*mv_table_id, truncate_epoch);
874 }
875 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
876 for mv_table_id in upstream_mv_table_ids {
877 update_truncate_epoch(mv_table_id, backfill_epoch);
878 }
879 }
880
881 let table_new_change_log = build_table_change_log_delta(
882 old_value_ssts.into_iter(),
883 synced_ssts.iter().map(|sst| &sst.sst_info),
884 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
885 mv_log_store_truncate_epoch.into_iter(),
886 );
887
888 let epoch = self.barrier_info.prev_epoch();
889 for table_id in &self.table_ids_to_commit {
890 info.tables_to_commit
891 .try_insert(*table_id, epoch)
892 .expect("non duplicate");
893 }
894
895 info.sstables.extend(synced_ssts);
896 info.new_table_watermarks.extend(new_table_watermarks);
897 info.sst_to_context.extend(sst_to_context);
898 info.new_table_fragment_infos
899 .extend(new_table_fragment_infos);
900 info.change_log_delta.extend(table_new_change_log);
901 for (table_id, vector_index_adds) in vector_index_adds {
902 info.vector_index_delta
903 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
904 .expect("non-duplicate");
905 }
906 if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
907 && let Some(index_table) = collect_new_vector_index_info(job_info)
908 {
909 info.vector_index_delta
910 .try_insert(
911 index_table.id,
912 VectorIndexDelta::Init(PbVectorIndexInit {
913 info: Some(index_table.vector_index_info.unwrap()),
914 }),
915 )
916 .expect("non-duplicate");
917 }
918 info.truncate_tables.extend(truncate_tables);
919 }
920}
921
922impl Command {
923 pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
925 {
926 {
927 if !is_currently_paused {
930 Some(Mutation::Pause(PauseMutation {}))
931 } else {
932 None
933 }
934 }
935 }
936 }
937
938 pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
940 {
941 {
942 if is_currently_paused {
944 Some(Mutation::Resume(ResumeMutation {}))
945 } else {
946 None
947 }
948 }
949 }
950 }
951
952 pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
954 {
955 {
956 let mut diff = HashMap::new();
957
958 for actor_splits in split_assignment.values() {
959 diff.extend(actor_splits.clone());
960 }
961
962 Mutation::Splits(SourceChangeSplitMutation {
963 actor_splits: build_actor_connector_splits(&diff),
964 })
965 }
966 }
967 }
968
969 pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
971 {
972 {
973 let config = config.clone();
974 Mutation::Throttle(ThrottleMutation {
975 fragment_throttle: config,
976 })
977 }
978 }
979 }
980
981 pub(super) fn drop_streaming_jobs_to_mutation(
983 actors: &Vec<ActorId>,
984 dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
985 ) -> Mutation {
986 {
987 Mutation::Stop(StopMutation {
988 actors: actors.clone(),
989 dropped_sink_fragments: dropped_sink_fragment_by_targets
990 .values()
991 .flatten()
992 .cloned()
993 .collect(),
994 })
995 }
996 }
997
998 pub(super) fn create_streaming_job_to_mutation(
1000 info: &CreateStreamingJobCommandInfo,
1001 job_type: &CreateStreamingJobType,
1002 is_currently_paused: bool,
1003 edges: &mut FragmentEdgeBuildResult,
1004 control_stream_manager: &ControlStreamManager,
1005 actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
1006 split_assignment: &SplitAssignment,
1007 ) -> MetaResult<Mutation> {
1008 {
1009 {
1010 let CreateStreamingJobCommandInfo {
1011 stream_job_fragments,
1012 upstream_fragment_downstreams,
1013 fragment_backfill_ordering,
1014 streaming_job,
1015 ..
1016 } = info;
1017 let database_id = streaming_job.database_id();
1018 let added_actors = stream_job_fragments.actor_ids().collect();
1019 let actor_splits = split_assignment
1020 .values()
1021 .flat_map(build_actor_connector_splits)
1022 .collect();
1023 let subscriptions_to_add =
1024 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1025 job_type
1026 {
1027 snapshot_backfill_info
1028 .upstream_mv_table_id_to_backfill_epoch
1029 .keys()
1030 .map(|table_id| SubscriptionUpstreamInfo {
1031 subscriber_id: stream_job_fragments
1032 .stream_job_id()
1033 .as_subscriber_id(),
1034 upstream_mv_table_id: *table_id,
1035 })
1036 .collect()
1037 } else {
1038 Default::default()
1039 };
1040 let backfill_nodes_to_pause: Vec<_> =
1041 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1042 .into_iter()
1043 .collect();
1044
1045 let new_upstream_sinks =
1046 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1047 sink_fragment_id,
1048 sink_output_fields,
1049 project_exprs,
1050 new_sink_downstream,
1051 ..
1052 }) = job_type
1053 {
1054 let new_sink_actors = stream_job_fragments
1055 .actors_to_create()
1056 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
1057 .exactly_one()
1058 .map(|(_, _, actors)| {
1059 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
1060 actor_id: actor.actor_id,
1061 host: Some(control_stream_manager.host_addr(worker_id)),
1062 partial_graph_id: to_partial_graph_id(database_id, None),
1063 })
1064 })
1065 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
1066 let new_upstream_sink = PbNewUpstreamSink {
1067 info: Some(PbUpstreamSinkInfo {
1068 upstream_fragment_id: *sink_fragment_id,
1069 sink_output_schema: sink_output_fields.clone(),
1070 project_exprs: project_exprs.clone(),
1071 }),
1072 upstream_actors: new_sink_actors.collect(),
1073 };
1074 HashMap::from([(
1075 new_sink_downstream.downstream_fragment_id,
1076 new_upstream_sink,
1077 )])
1078 } else {
1079 HashMap::new()
1080 };
1081
1082 let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1083 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1084
1085 let add_mutation = AddMutation {
1086 actor_dispatchers: edges
1087 .dispatchers
1088 .extract_if(|fragment_id, _| {
1089 upstream_fragment_downstreams.contains_key(fragment_id)
1090 })
1091 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1092 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1093 .collect(),
1094 added_actors,
1095 actor_splits,
1096 pause: is_currently_paused,
1098 subscriptions_to_add,
1099 backfill_nodes_to_pause,
1100 actor_cdc_table_snapshot_splits,
1101 new_upstream_sinks,
1102 };
1103
1104 Ok(Mutation::Add(add_mutation))
1105 }
1106 }
1107 }
1108
1109 pub(super) fn replace_stream_job_to_mutation(
1111 ReplaceStreamJobPlan {
1112 old_fragments,
1113 replace_upstream,
1114 upstream_fragment_downstreams,
1115 auto_refresh_schema_sinks,
1116 ..
1117 }: &ReplaceStreamJobPlan,
1118 edges: &mut FragmentEdgeBuildResult,
1119 database_info: &mut InflightDatabaseInfo,
1120 split_assignment: &SplitAssignment,
1121 ) -> MetaResult<Option<Mutation>> {
1122 {
1123 {
1124 let merge_updates = edges
1125 .merge_updates
1126 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1127 .collect();
1128 let dispatchers = edges
1129 .dispatchers
1130 .extract_if(|fragment_id, _| {
1131 upstream_fragment_downstreams.contains_key(fragment_id)
1132 })
1133 .collect();
1134 let actor_cdc_table_snapshot_splits = database_info
1135 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1136 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1137 Ok(Self::generate_update_mutation_for_replace_table(
1138 old_fragments.actor_ids().chain(
1139 auto_refresh_schema_sinks
1140 .as_ref()
1141 .into_iter()
1142 .flat_map(|sinks| {
1143 sinks.iter().flat_map(|sink| {
1144 sink.original_fragment
1145 .actors
1146 .iter()
1147 .map(|actor| actor.actor_id)
1148 })
1149 }),
1150 ),
1151 merge_updates,
1152 dispatchers,
1153 split_assignment,
1154 actor_cdc_table_snapshot_splits,
1155 auto_refresh_schema_sinks.as_ref(),
1156 ))
1157 }
1158 }
1159 }
1160
1161 pub(super) fn reschedule_to_mutation(
1163 reschedules: &HashMap<FragmentId, Reschedule>,
1164 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1165 control_stream_manager: &ControlStreamManager,
1166 database_info: &mut InflightDatabaseInfo,
1167 ) -> MetaResult<Option<Mutation>> {
1168 {
1169 {
1170 let database_id = database_info.database_id;
1171 let mut dispatcher_update = HashMap::new();
1172 for reschedule in reschedules.values() {
1173 for &(upstream_fragment_id, dispatcher_id) in
1174 &reschedule.upstream_fragment_dispatcher_ids
1175 {
1176 let upstream_actor_ids = fragment_actors
1178 .get(&upstream_fragment_id)
1179 .expect("should contain");
1180
1181 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1182
1183 for &actor_id in upstream_actor_ids {
1185 let added_downstream_actor_id = if upstream_reschedule
1186 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1187 .unwrap_or(true)
1188 {
1189 reschedule
1190 .added_actors
1191 .values()
1192 .flatten()
1193 .cloned()
1194 .collect()
1195 } else {
1196 Default::default()
1197 };
1198 dispatcher_update
1200 .try_insert(
1201 (actor_id, dispatcher_id),
1202 DispatcherUpdate {
1203 actor_id,
1204 dispatcher_id,
1205 hash_mapping: reschedule
1206 .upstream_dispatcher_mapping
1207 .as_ref()
1208 .map(|m| m.to_protobuf()),
1209 added_downstream_actor_id,
1210 removed_downstream_actor_id: reschedule
1211 .removed_actors
1212 .iter()
1213 .cloned()
1214 .collect(),
1215 },
1216 )
1217 .unwrap();
1218 }
1219 }
1220 }
1221 let dispatcher_update = dispatcher_update.into_values().collect();
1222
1223 let mut merge_update = HashMap::new();
1224 for (&fragment_id, reschedule) in reschedules {
1225 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1226 let downstream_actor_ids = fragment_actors
1228 .get(&downstream_fragment_id)
1229 .expect("should contain");
1230
1231 let downstream_removed_actors: HashSet<_> = reschedules
1235 .get(&downstream_fragment_id)
1236 .map(|downstream_reschedule| {
1237 downstream_reschedule
1238 .removed_actors
1239 .iter()
1240 .copied()
1241 .collect()
1242 })
1243 .unwrap_or_default();
1244
1245 for &actor_id in downstream_actor_ids {
1247 if downstream_removed_actors.contains(&actor_id) {
1248 continue;
1249 }
1250
1251 merge_update
1253 .try_insert(
1254 (actor_id, fragment_id),
1255 MergeUpdate {
1256 actor_id,
1257 upstream_fragment_id: fragment_id,
1258 new_upstream_fragment_id: None,
1259 added_upstream_actors: reschedule
1260 .added_actors
1261 .iter()
1262 .flat_map(|(worker_id, actors)| {
1263 let host =
1264 control_stream_manager.host_addr(*worker_id);
1265 actors.iter().map(move |&actor_id| PbActorInfo {
1266 actor_id,
1267 host: Some(host.clone()),
1268 partial_graph_id: to_partial_graph_id(
1270 database_id,
1271 None,
1272 ),
1273 })
1274 })
1275 .collect(),
1276 removed_upstream_actor_id: reschedule
1277 .removed_actors
1278 .iter()
1279 .cloned()
1280 .collect(),
1281 },
1282 )
1283 .unwrap();
1284 }
1285 }
1286 }
1287 let merge_update = merge_update.into_values().collect();
1288
1289 let mut actor_vnode_bitmap_update = HashMap::new();
1290 for reschedule in reschedules.values() {
1291 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1293 let bitmap = bitmap.to_protobuf();
1294 actor_vnode_bitmap_update
1295 .try_insert(actor_id, bitmap)
1296 .unwrap();
1297 }
1298 }
1299 let dropped_actors = reschedules
1300 .values()
1301 .flat_map(|r| r.removed_actors.iter().copied())
1302 .collect();
1303 let mut actor_splits = HashMap::new();
1304 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1305 for (fragment_id, reschedule) in reschedules {
1306 for (actor_id, splits) in &reschedule.actor_splits {
1307 actor_splits.insert(
1308 *actor_id,
1309 ConnectorSplits {
1310 splits: splits.iter().map(ConnectorSplit::from).collect(),
1311 },
1312 );
1313 }
1314
1315 if let Some(assignment) =
1316 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1317 {
1318 actor_cdc_table_snapshot_splits.extend(assignment)
1319 }
1320 }
1321
1322 let actor_new_dispatchers = HashMap::new();
1324 let mutation = Mutation::Update(UpdateMutation {
1325 dispatcher_update,
1326 merge_update,
1327 actor_vnode_bitmap_update,
1328 dropped_actors,
1329 actor_splits,
1330 actor_new_dispatchers,
1331 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1332 splits: actor_cdc_table_snapshot_splits,
1333 }),
1334 sink_schema_change: Default::default(),
1335 subscriptions_to_drop: vec![],
1336 });
1337 tracing::debug!("update mutation: {mutation:?}");
1338 Ok(Some(mutation))
1339 }
1340 }
1341 }
1342
1343 pub(super) fn create_subscription_to_mutation(
1345 upstream_mv_table_id: TableId,
1346 subscription_id: SubscriptionId,
1347 ) -> Mutation {
1348 {
1349 Mutation::Add(AddMutation {
1350 actor_dispatchers: Default::default(),
1351 added_actors: vec![],
1352 actor_splits: Default::default(),
1353 pause: false,
1354 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1355 upstream_mv_table_id,
1356 subscriber_id: subscription_id.as_subscriber_id(),
1357 }],
1358 backfill_nodes_to_pause: vec![],
1359 actor_cdc_table_snapshot_splits: None,
1360 new_upstream_sinks: Default::default(),
1361 })
1362 }
1363 }
1364
1365 pub(super) fn drop_subscription_to_mutation(
1367 upstream_mv_table_id: TableId,
1368 subscription_id: SubscriptionId,
1369 ) -> Mutation {
1370 {
1371 Mutation::DropSubscriptions(DropSubscriptionsMutation {
1372 info: vec![SubscriptionUpstreamInfo {
1373 subscriber_id: subscription_id.as_subscriber_id(),
1374 upstream_mv_table_id,
1375 }],
1376 })
1377 }
1378 }
1379
1380 pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1382 {
1383 {
1384 let mut connector_props_infos = HashMap::default();
1385 for (k, v) in config {
1386 connector_props_infos.insert(
1387 k.as_raw_id(),
1388 ConnectorPropsInfo {
1389 connector_props_info: v.clone(),
1390 },
1391 );
1392 }
1393 Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1394 connector_props_infos,
1395 })
1396 }
1397 }
1398 }
1399
1400 pub(super) fn refresh_to_mutation(
1402 table_id: TableId,
1403 associated_source_id: SourceId,
1404 ) -> Mutation {
1405 Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1406 table_id,
1407 associated_source_id,
1408 })
1409 }
1410
1411 pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1413 Mutation::ListFinish(ListFinishMutation {
1414 associated_source_id,
1415 })
1416 }
1417
1418 pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1420 Mutation::LoadFinish(LoadFinishMutation {
1421 associated_source_id,
1422 })
1423 }
1424
1425 pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1427 Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1428 source_id: source_id.as_raw_id(),
1429 })
1430 }
1431
1432 pub(super) fn resume_backfill_to_mutation(
1434 target: &ResumeBackfillTarget,
1435 database_info: &InflightDatabaseInfo,
1436 ) -> MetaResult<Option<Mutation>> {
1437 {
1438 {
1439 let fragment_ids: HashSet<_> = match target {
1440 ResumeBackfillTarget::Job(job_id) => {
1441 database_info.backfill_fragment_ids_for_job(*job_id)?
1442 }
1443 ResumeBackfillTarget::Fragment(fragment_id) => {
1444 if !database_info.is_backfill_fragment(*fragment_id)? {
1445 return Err(MetaError::invalid_parameter(format!(
1446 "fragment {} is not a backfill node",
1447 fragment_id
1448 )));
1449 }
1450 HashSet::from([*fragment_id])
1451 }
1452 };
1453 if fragment_ids.is_empty() {
1454 warn!(
1455 ?target,
1456 "resume backfill command ignored because no backfill fragments found"
1457 );
1458 Ok(None)
1459 } else {
1460 Ok(Some(Mutation::StartFragmentBackfill(
1461 StartFragmentBackfillMutation {
1462 fragment_ids: fragment_ids.into_iter().collect(),
1463 },
1464 )))
1465 }
1466 }
1467 }
1468 }
1469
1470 pub(super) fn inject_source_offsets_to_mutation(
1472 source_id: SourceId,
1473 split_offsets: &HashMap<String, String>,
1474 ) -> Mutation {
1475 Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1476 source_id: source_id.as_raw_id(),
1477 split_offsets: split_offsets.clone(),
1478 })
1479 }
1480
1481 pub(super) fn create_streaming_job_actors_to_create(
1483 info: &CreateStreamingJobCommandInfo,
1484 edges: &mut FragmentEdgeBuildResult,
1485 ) -> StreamJobActorsToCreate {
1486 {
1487 {
1488 let actors_to_create = info.stream_job_fragments.actors_to_create();
1489 edges.collect_actors_to_create(actors_to_create.map(
1490 |(fragment_id, node, actors)| {
1491 (
1492 fragment_id,
1493 node,
1494 actors,
1495 [], )
1497 },
1498 ))
1499 }
1500 }
1501 }
1502
1503 pub(super) fn reschedule_actors_to_create(
1505 reschedules: &HashMap<FragmentId, Reschedule>,
1506 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1507 database_info: &InflightDatabaseInfo,
1508 control_stream_manager: &ControlStreamManager,
1509 ) -> StreamJobActorsToCreate {
1510 {
1511 {
1512 let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1513 reschedules.iter().map(|(fragment_id, reschedule)| {
1514 (
1515 *fragment_id,
1516 reschedule.newly_created_actors.values().map(
1517 |((actor, dispatchers), _)| {
1518 (actor.actor_id, dispatchers.as_slice())
1519 },
1520 ),
1521 )
1522 }),
1523 Some((reschedules, fragment_actors)),
1524 database_info,
1525 control_stream_manager,
1526 );
1527 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1528 for (fragment_id, (actor, dispatchers), worker_id) in
1529 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1530 reschedule
1531 .newly_created_actors
1532 .values()
1533 .map(|(actors, status)| (*fragment_id, actors, status))
1534 })
1535 {
1536 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1537 map.entry(*worker_id)
1538 .or_default()
1539 .entry(fragment_id)
1540 .or_insert_with(|| {
1541 let node = database_info.fragment(fragment_id).nodes.clone();
1542 let subscribers =
1543 database_info.fragment_subscribers(fragment_id).collect();
1544 (node, vec![], subscribers)
1545 })
1546 .1
1547 .push((actor.clone(), upstreams, dispatchers.clone()));
1548 }
1549 map
1550 }
1551 }
1552 }
1553
1554 pub(super) fn replace_stream_job_actors_to_create(
1556 replace_table: &ReplaceStreamJobPlan,
1557 edges: &mut FragmentEdgeBuildResult,
1558 database_info: &InflightDatabaseInfo,
1559 ) -> StreamJobActorsToCreate {
1560 {
1561 {
1562 let mut actors = edges.collect_actors_to_create(
1563 replace_table.new_fragments.actors_to_create().map(
1564 |(fragment_id, node, actors)| {
1565 (
1566 fragment_id,
1567 node,
1568 actors,
1569 database_info
1570 .job_subscribers(replace_table.old_fragments.stream_job_id),
1571 )
1572 },
1573 ),
1574 );
1575 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1576 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1577 (
1578 sink.new_fragment.fragment_id,
1579 &sink.new_fragment.nodes,
1580 sink.new_fragment.actors.iter().map(|actor| {
1581 (
1582 actor,
1583 sink.actor_status[&actor.actor_id]
1584 .location
1585 .as_ref()
1586 .unwrap()
1587 .worker_node_id,
1588 )
1589 }),
1590 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1591 )
1592 }));
1593 for (worker_id, fragment_actors) in sink_actors {
1594 actors.entry(worker_id).or_default().extend(fragment_actors);
1595 }
1596 }
1597 actors
1598 }
1599 }
1600 }
1601
1602 fn generate_update_mutation_for_replace_table(
1603 dropped_actors: impl IntoIterator<Item = ActorId>,
1604 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1605 dispatchers: FragmentActorDispatchers,
1606 split_assignment: &SplitAssignment,
1607 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1608 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1609 ) -> Option<Mutation> {
1610 let dropped_actors = dropped_actors.into_iter().collect();
1611
1612 let actor_new_dispatchers = dispatchers
1613 .into_values()
1614 .flatten()
1615 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1616 .collect();
1617
1618 let actor_splits = split_assignment
1619 .values()
1620 .flat_map(build_actor_connector_splits)
1621 .collect();
1622 Some(Mutation::Update(UpdateMutation {
1623 actor_new_dispatchers,
1624 merge_update: merge_updates.into_values().flatten().collect(),
1625 dropped_actors,
1626 actor_splits,
1627 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1628 sink_schema_change: auto_refresh_schema_sinks
1629 .as_ref()
1630 .into_iter()
1631 .flat_map(|sinks| {
1632 sinks.iter().map(|sink| {
1633 (
1634 sink.original_sink.id.as_raw_id(),
1635 PbSinkSchemaChange {
1636 original_schema: sink
1637 .original_sink
1638 .columns
1639 .iter()
1640 .map(|col| PbField {
1641 data_type: Some(
1642 col.column_desc
1643 .as_ref()
1644 .unwrap()
1645 .column_type
1646 .as_ref()
1647 .unwrap()
1648 .clone(),
1649 ),
1650 name: col.column_desc.as_ref().unwrap().name.clone(),
1651 })
1652 .collect(),
1653 op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1654 fields: sink
1655 .newly_add_fields
1656 .iter()
1657 .map(|field| field.to_prost())
1658 .collect(),
1659 })),
1660 },
1661 )
1662 })
1663 })
1664 .collect(),
1665 ..Default::default()
1666 }))
1667 }
1668}
1669
1670impl Command {
1671 #[expect(clippy::type_complexity)]
1672 pub(super) fn collect_database_partial_graph_actor_upstreams(
1673 actor_dispatchers: impl Iterator<
1674 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1675 >,
1676 reschedule_dispatcher_update: Option<(
1677 &HashMap<FragmentId, Reschedule>,
1678 &HashMap<FragmentId, HashSet<ActorId>>,
1679 )>,
1680 database_info: &InflightDatabaseInfo,
1681 control_stream_manager: &ControlStreamManager,
1682 ) -> HashMap<ActorId, ActorUpstreams> {
1683 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1684 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1685 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1686 for (upstream_actor_id, dispatchers) in upstream_actors {
1687 let upstream_actor_location =
1688 upstream_fragment.actors[&upstream_actor_id].worker_id;
1689 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1690 for downstream_actor_id in dispatchers
1691 .iter()
1692 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1693 {
1694 actor_upstreams
1695 .entry(*downstream_actor_id)
1696 .or_default()
1697 .entry(upstream_fragment_id)
1698 .or_default()
1699 .insert(
1700 upstream_actor_id,
1701 PbActorInfo {
1702 actor_id: upstream_actor_id,
1703 host: Some(upstream_actor_host.clone()),
1704 partial_graph_id: to_partial_graph_id(
1705 database_info.database_id,
1706 None,
1707 ),
1708 },
1709 );
1710 }
1711 }
1712 }
1713 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1714 for reschedule in reschedules.values() {
1715 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1716 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1717 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1718 for upstream_actor_id in fragment_actors
1719 .get(upstream_fragment_id)
1720 .expect("should exist")
1721 {
1722 let upstream_actor_location =
1723 upstream_fragment.actors[upstream_actor_id].worker_id;
1724 let upstream_actor_host =
1725 control_stream_manager.host_addr(upstream_actor_location);
1726 if let Some(upstream_reschedule) = upstream_reschedule
1727 && upstream_reschedule
1728 .removed_actors
1729 .contains(upstream_actor_id)
1730 {
1731 continue;
1732 }
1733 for (_, downstream_actor_id) in
1734 reschedule
1735 .added_actors
1736 .iter()
1737 .flat_map(|(worker_id, actors)| {
1738 actors.iter().map(|actor| (*worker_id, *actor))
1739 })
1740 {
1741 actor_upstreams
1742 .entry(downstream_actor_id)
1743 .or_default()
1744 .entry(*upstream_fragment_id)
1745 .or_default()
1746 .insert(
1747 *upstream_actor_id,
1748 PbActorInfo {
1749 actor_id: *upstream_actor_id,
1750 host: Some(upstream_actor_host.clone()),
1751 partial_graph_id: to_partial_graph_id(
1752 database_info.database_id,
1753 None,
1754 ),
1755 },
1756 );
1757 }
1758 }
1759 }
1760 }
1761 }
1762 actor_upstreams
1763 }
1764}