1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation, streaming_job};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37 PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkDropColumnsOp,
49 PbSinkSchemaChange, PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation,
50 StartFragmentBackfillMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation,
51 UpdateMutation,
52};
53use risingwave_pb::stream_service::BarrierCompleteResponse;
54use tracing::warn;
55
56use super::info::InflightDatabaseInfo;
57use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
58use crate::barrier::edge_builder::FragmentEdgeBuildResult;
59use crate::barrier::info::BarrierInfo;
60use crate::barrier::partial_graph::PartialGraphBarrierInfo;
61use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
62use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
63use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
64use crate::controller::scale::LoadedFragmentContext;
65use crate::controller::utils::StreamingJobExtraInfo;
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70 FragmentId, FragmentReplaceUpstream, StreamActor, StreamActorWithDispatchers,
71 StreamJobActorsToCreate, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::{
74 AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
75 ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
76 build_actor_connector_splits,
77};
78use crate::{MetaError, MetaResult};
79
80#[derive(Debug, Clone)]
83pub struct Reschedule {
84 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
86
87 pub removed_actors: HashSet<ActorId>,
89
90 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
92
93 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
95 pub upstream_dispatcher_mapping: Option<ActorMapping>,
100
101 pub downstream_fragment_ids: Vec<FragmentId>,
103
104 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
108
109 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
110}
111
112#[derive(Debug, Clone)]
113pub struct ReschedulePlan {
114 pub reschedules: HashMap<FragmentId, Reschedule>,
115 pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
118}
119
120#[derive(Debug, Clone)]
122pub struct RescheduleContext {
123 pub loaded: LoadedFragmentContext,
124 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
125 pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
126 pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
127 pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
128}
129
130impl RescheduleContext {
131 pub fn empty() -> Self {
132 Self {
133 loaded: LoadedFragmentContext::default(),
134 job_extra_info: HashMap::new(),
135 upstream_fragments: HashMap::new(),
136 downstream_fragments: HashMap::new(),
137 downstream_relations: HashMap::new(),
138 }
139 }
140
141 pub fn is_empty(&self) -> bool {
142 self.loaded.is_empty()
143 }
144
145 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
146 let loaded = self.loaded.for_database(database_id)?;
147 let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
148 let fragment_ids: HashSet<FragmentId> = loaded
151 .job_fragments
152 .values()
153 .flat_map(|fragments| fragments.keys().copied())
154 .collect();
155
156 let job_extra_info = self
157 .job_extra_info
158 .iter()
159 .filter(|(job_id, _)| job_ids.contains(*job_id))
160 .map(|(job_id, info)| (*job_id, info.clone()))
161 .collect();
162
163 let upstream_fragments = self
164 .upstream_fragments
165 .iter()
166 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
167 .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
168 .collect();
169
170 let downstream_fragments = self
171 .downstream_fragments
172 .iter()
173 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
174 .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
175 .collect();
176
177 let downstream_relations = self
178 .downstream_relations
179 .iter()
180 .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
184 .map(|(key, relation)| (*key, relation.clone()))
185 .collect();
186
187 Some(Self {
188 loaded,
189 job_extra_info,
190 upstream_fragments,
191 downstream_fragments,
192 downstream_relations,
193 })
194 }
195
196 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
199 let Self {
200 loaded,
201 job_extra_info,
202 upstream_fragments,
203 downstream_fragments,
204 downstream_relations,
205 } = self;
206
207 let mut contexts: HashMap<_, _> = loaded
208 .into_database_contexts()
209 .into_iter()
210 .map(|(database_id, loaded)| {
211 (
212 database_id,
213 Self {
214 loaded,
215 job_extra_info: HashMap::new(),
216 upstream_fragments: HashMap::new(),
217 downstream_fragments: HashMap::new(),
218 downstream_relations: HashMap::new(),
219 },
220 )
221 })
222 .collect();
223
224 if contexts.is_empty() {
225 return contexts;
226 }
227
228 let mut job_databases = HashMap::new();
229 let mut fragment_databases = HashMap::new();
230 for (&database_id, context) in &contexts {
231 for job_id in context.loaded.job_map.keys().copied() {
232 job_databases.insert(job_id, database_id);
233 }
234 for fragment_id in context
235 .loaded
236 .job_fragments
237 .values()
238 .flat_map(|fragments| fragments.keys().copied())
239 {
240 fragment_databases.insert(fragment_id, database_id);
241 }
242 }
243
244 for (job_id, info) in job_extra_info {
245 if let Some(database_id) = job_databases.get(&job_id).copied() {
246 contexts
247 .get_mut(&database_id)
248 .expect("database context should exist for job")
249 .job_extra_info
250 .insert(job_id, info);
251 }
252 }
253
254 for (fragment_id, upstreams) in upstream_fragments {
255 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
256 contexts
257 .get_mut(&database_id)
258 .expect("database context should exist for fragment")
259 .upstream_fragments
260 .insert(fragment_id, upstreams);
261 }
262 }
263
264 for (fragment_id, downstreams) in downstream_fragments {
265 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
266 contexts
267 .get_mut(&database_id)
268 .expect("database context should exist for fragment")
269 .downstream_fragments
270 .insert(fragment_id, downstreams);
271 }
272 }
273
274 for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
275 if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
278 contexts
279 .get_mut(&database_id)
280 .expect("database context should exist for relation source")
281 .downstream_relations
282 .insert((source_fragment_id, target_fragment_id), relation);
283 }
284 }
285
286 contexts
287 }
288}
289
290#[derive(Debug, Clone)]
297pub struct ReplaceStreamJobPlan {
298 pub old_fragments: StreamJobFragments,
299 pub new_fragments: StreamJobFragmentsToCreate,
300 pub database_resource_group: String,
302 pub replace_upstream: FragmentReplaceUpstream,
305 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
306 pub split_plan: ReplaceJobSplitPlan,
309 pub streaming_job: StreamingJob,
311 pub streaming_job_model: streaming_job::Model,
313 pub tmp_id: JobId,
315 pub to_drop_state_table_ids: Vec<TableId>,
317 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
318}
319
320impl ReplaceStreamJobPlan {
321 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
323 let mut fragment_replacements = HashMap::new();
324 for (upstream_fragment_id, new_upstream_fragment_id) in
325 self.replace_upstream.values().flatten()
326 {
327 {
328 let r =
329 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
330 if let Some(r) = r {
331 assert_eq!(
332 *new_upstream_fragment_id, r,
333 "one fragment is replaced by multiple fragments"
334 );
335 }
336 }
337 }
338 fragment_replacements
339 }
340}
341
342#[derive(educe::Educe, Clone)]
343#[educe(Debug)]
344pub struct CreateStreamingJobCommandInfo {
345 #[educe(Debug(ignore))]
346 pub stream_job_fragments: StreamJobFragmentsToCreate,
347 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
348 pub database_resource_group: String,
350 pub init_split_assignment: SourceSplitAssignment,
352 pub definition: String,
353 pub job_type: StreamingJobType,
354 pub create_type: CreateType,
355 pub streaming_job: StreamingJob,
356 pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
357 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
358 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
359 pub is_serverless: bool,
360 pub streaming_job_model: streaming_job::Model,
362}
363
364impl StreamJobFragments {
365 pub(super) fn new_fragment_info<'a>(
368 &'a self,
369 stream_actors: &'a HashMap<FragmentId, Vec<StreamActor>>,
370 actor_location: &'a HashMap<ActorId, WorkerId>,
371 assignment: &'a SplitAssignment,
372 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
373 self.fragments.values().map(|fragment| {
374 (
375 fragment.fragment_id,
376 InflightFragmentInfo {
377 fragment_id: fragment.fragment_id,
378 distribution_type: fragment.distribution_type.into(),
379 fragment_type_mask: fragment.fragment_type_mask,
380 vnode_count: fragment.vnode_count(),
381 nodes: fragment.nodes.clone(),
382 actors: stream_actors
383 .get(&fragment.fragment_id)
384 .into_iter()
385 .flatten()
386 .map(|actor| {
387 (
388 actor.actor_id,
389 InflightActorInfo {
390 worker_id: actor_location[&actor.actor_id],
391 vnode_bitmap: actor.vnode_bitmap.clone(),
392 splits: assignment
393 .get(&fragment.fragment_id)
394 .and_then(|s| s.get(&actor.actor_id))
395 .cloned()
396 .unwrap_or_default(),
397 },
398 )
399 })
400 .collect(),
401 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
402 },
403 )
404 })
405 }
406}
407
408#[derive(Debug, Clone)]
409pub struct SnapshotBackfillInfo {
410 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
414}
415
416#[derive(Debug, Clone)]
417pub enum CreateStreamingJobType {
418 Normal,
419 SinkIntoTable(UpstreamSinkInfo),
420 SnapshotBackfill(SnapshotBackfillInfo),
421}
422
423#[derive(Debug)]
428pub enum Command {
429 Flush,
432
433 Pause,
436
437 Resume,
441
442 DropStreamingJobs {
450 streaming_job_ids: HashSet<JobId>,
451 unregistered_state_table_ids: HashSet<TableId>,
452 unregistered_fragment_ids: HashSet<FragmentId>,
453 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
455 },
456
457 CreateStreamingJob {
467 info: CreateStreamingJobCommandInfo,
468 job_type: CreateStreamingJobType,
469 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
470 },
471
472 RescheduleIntent {
474 context: RescheduleContext,
475 reschedule_plan: Option<ReschedulePlan>,
480 },
481
482 ReplaceStreamJob(ReplaceStreamJobPlan),
489
490 SourceChangeSplit(SplitState),
493
494 Throttle {
497 jobs: HashSet<JobId>,
498 config: HashMap<FragmentId, ThrottleConfig>,
499 },
500
501 CreateSubscription {
504 subscription_id: SubscriptionId,
505 upstream_mv_table_id: TableId,
506 retention_second: u64,
507 },
508
509 DropSubscription {
513 subscription_id: SubscriptionId,
514 upstream_mv_table_id: TableId,
515 },
516
517 AlterSubscriptionRetention {
519 subscription_id: SubscriptionId,
520 upstream_mv_table_id: TableId,
521 retention_second: u64,
522 },
523
524 ConnectorPropsChange(ConnectorPropsChange),
525
526 Refresh {
529 table_id: TableId,
530 associated_source_id: SourceId,
531 },
532 ListFinish {
533 table_id: TableId,
534 associated_source_id: SourceId,
535 },
536 LoadFinish {
537 table_id: TableId,
538 associated_source_id: SourceId,
539 },
540
541 ResetSource {
544 source_id: SourceId,
545 },
546
547 ResumeBackfill {
550 target: ResumeBackfillTarget,
551 },
552
553 InjectSourceOffsets {
557 source_id: SourceId,
558 split_offsets: HashMap<String, String>,
560 },
561}
562
563#[derive(Debug, Clone, Copy)]
564pub enum ResumeBackfillTarget {
565 Job(JobId),
566 Fragment(FragmentId),
567}
568
569impl std::fmt::Display for Command {
571 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
572 match self {
573 Command::Flush => write!(f, "Flush"),
574 Command::Pause => write!(f, "Pause"),
575 Command::Resume => write!(f, "Resume"),
576 Command::DropStreamingJobs {
577 streaming_job_ids, ..
578 } => {
579 write!(
580 f,
581 "DropStreamingJobs: {}",
582 streaming_job_ids.iter().sorted().join(", ")
583 )
584 }
585 Command::CreateStreamingJob { info, .. } => {
586 write!(f, "CreateStreamingJob: {}", info.streaming_job)
587 }
588 Command::RescheduleIntent {
589 reschedule_plan, ..
590 } => {
591 if reschedule_plan.is_some() {
592 write!(f, "RescheduleIntent(planned)")
593 } else {
594 write!(f, "RescheduleIntent")
595 }
596 }
597 Command::ReplaceStreamJob(plan) => {
598 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
599 }
600 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
601 Command::Throttle { .. } => write!(f, "Throttle"),
602 Command::CreateSubscription {
603 subscription_id, ..
604 } => write!(f, "CreateSubscription: {subscription_id}"),
605 Command::DropSubscription {
606 subscription_id, ..
607 } => write!(f, "DropSubscription: {subscription_id}"),
608 Command::AlterSubscriptionRetention {
609 subscription_id,
610 retention_second,
611 ..
612 } => write!(
613 f,
614 "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
615 ),
616 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
617 Command::Refresh {
618 table_id,
619 associated_source_id,
620 } => write!(
621 f,
622 "Refresh: {} (source: {})",
623 table_id, associated_source_id
624 ),
625 Command::ListFinish {
626 table_id,
627 associated_source_id,
628 } => write!(
629 f,
630 "ListFinish: {} (source: {})",
631 table_id, associated_source_id
632 ),
633 Command::LoadFinish {
634 table_id,
635 associated_source_id,
636 } => write!(
637 f,
638 "LoadFinish: {} (source: {})",
639 table_id, associated_source_id
640 ),
641 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
642 Command::ResumeBackfill { target } => match target {
643 ResumeBackfillTarget::Job(job_id) => {
644 write!(f, "ResumeBackfill: job={job_id}")
645 }
646 ResumeBackfillTarget::Fragment(fragment_id) => {
647 write!(f, "ResumeBackfill: fragment={fragment_id}")
648 }
649 },
650 Command::InjectSourceOffsets {
651 source_id,
652 split_offsets,
653 } => write!(
654 f,
655 "InjectSourceOffsets: {} ({} splits)",
656 source_id,
657 split_offsets.len()
658 ),
659 }
660 }
661}
662
663impl Command {
664 pub fn pause() -> Self {
665 Self::Pause
666 }
667
668 pub fn resume() -> Self {
669 Self::Resume
670 }
671
672 pub fn need_checkpoint(&self) -> bool {
673 !matches!(self, Command::Resume)
675 }
676}
677
678#[derive(Debug)]
679pub enum PostCollectCommand {
680 Command(String),
681 DropStreamingJobs {
682 streaming_job_ids: HashSet<JobId>,
683 unregistered_state_table_ids: HashSet<TableId>,
684 },
685 CreateStreamingJob {
686 info: CreateStreamingJobCommandInfo,
687 job_type: CreateStreamingJobType,
688 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
689 resolved_split_assignment: SplitAssignment,
690 },
691 Reschedule {
692 reschedules: HashMap<FragmentId, Reschedule>,
693 },
694 ReplaceStreamJob {
695 plan: ReplaceStreamJobPlan,
696 resolved_split_assignment: SplitAssignment,
697 },
698 SourceChangeSplit {
699 split_assignment: SplitAssignment,
700 },
701 CreateSubscription {
702 subscription_id: SubscriptionId,
703 },
704 ConnectorPropsChange(ConnectorPropsChange),
705 ResumeBackfill {
706 target: ResumeBackfillTarget,
707 },
708}
709
710impl PostCollectCommand {
711 pub fn barrier() -> Self {
712 PostCollectCommand::Command("barrier".to_owned())
713 }
714
715 pub fn should_checkpoint(&self) -> bool {
716 match self {
717 PostCollectCommand::DropStreamingJobs { .. }
718 | PostCollectCommand::CreateStreamingJob { .. }
719 | PostCollectCommand::Reschedule { .. }
720 | PostCollectCommand::ReplaceStreamJob { .. }
721 | PostCollectCommand::SourceChangeSplit { .. }
722 | PostCollectCommand::CreateSubscription { .. }
723 | PostCollectCommand::ConnectorPropsChange(_)
724 | PostCollectCommand::ResumeBackfill { .. } => true,
725 PostCollectCommand::Command(_) => false,
726 }
727 }
728
729 pub fn command_name(&self) -> &str {
730 match self {
731 PostCollectCommand::Command(name) => name.as_str(),
732 PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
733 PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
734 PostCollectCommand::Reschedule { .. } => "Reschedule",
735 PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
736 PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
737 PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
738 PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
739 PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
740 }
741 }
742}
743
744impl Display for PostCollectCommand {
745 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
746 f.write_str(self.command_name())
747 }
748}
749
750#[derive(Debug, Clone, PartialEq, Eq)]
751pub enum BarrierKind {
752 Initial,
753 Barrier,
754 Checkpoint(Vec<u64>),
756}
757
758impl BarrierKind {
759 pub fn to_protobuf(&self) -> PbBarrierKind {
760 match self {
761 BarrierKind::Initial => PbBarrierKind::Initial,
762 BarrierKind::Barrier => PbBarrierKind::Barrier,
763 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
764 }
765 }
766
767 pub fn is_checkpoint(&self) -> bool {
768 matches!(self, BarrierKind::Checkpoint(_))
769 }
770
771 pub fn is_initial(&self) -> bool {
772 matches!(self, BarrierKind::Initial)
773 }
774
775 pub fn as_str_name(&self) -> &'static str {
776 match self {
777 BarrierKind::Initial => "Initial",
778 BarrierKind::Barrier => "Barrier",
779 BarrierKind::Checkpoint(_) => "Checkpoint",
780 }
781 }
782}
783
784impl BarrierInfo {
785 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
786 let Some(truncate_timestamptz) = Timestamptz::from_secs(
787 self.prev_epoch.value().as_timestamptz().timestamp() - retention_second as i64,
788 ) else {
789 warn!(retention_second, prev_epoch = ?self.prev_epoch.value(), "invalid retention second value");
790 return self.prev_epoch.value();
791 };
792 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
793 }
794}
795
796impl Command {
797 pub(super) fn collect_commit_epoch_info(
798 database_info: &InflightDatabaseInfo,
799 barrier_info: &PartialGraphBarrierInfo,
800 info: &mut CommitEpochInfo,
801 resps: Vec<BarrierCompleteResponse>,
802 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
803 ) {
804 let (
805 sst_to_context,
806 synced_ssts,
807 new_table_watermarks,
808 old_value_ssts,
809 vector_index_adds,
810 truncate_tables,
811 ) = collect_resp_info(resps);
812
813 let new_table_fragment_infos =
814 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } =
815 &barrier_info.post_collect_command
816 {
817 assert!(!matches!(
818 job_type,
819 CreateStreamingJobType::SnapshotBackfill(_)
820 ));
821 let table_fragments = &info.stream_job_fragments;
822 let mut table_ids: HashSet<_> =
823 table_fragments.internal_table_ids().into_iter().collect();
824 if let Some(mv_table_id) = table_fragments.mv_table_id() {
825 table_ids.insert(mv_table_id);
826 }
827
828 vec![NewTableFragmentInfo { table_ids }]
829 } else {
830 vec![]
831 };
832
833 let mut mv_log_store_truncate_epoch = HashMap::new();
834 let mut update_truncate_epoch =
836 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
837 Entry::Occupied(mut entry) => {
838 let prev_truncate_epoch = entry.get_mut();
839 if truncate_epoch < *prev_truncate_epoch {
840 *prev_truncate_epoch = truncate_epoch;
841 }
842 }
843 Entry::Vacant(entry) => {
844 entry.insert(truncate_epoch);
845 }
846 };
847 for (mv_table_id, max_retention) in database_info.max_subscription_retention() {
848 let truncate_epoch = barrier_info
849 .barrier_info
850 .get_truncate_epoch(max_retention)
851 .0;
852 update_truncate_epoch(mv_table_id, truncate_epoch);
853 }
854 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
855 for mv_table_id in upstream_mv_table_ids {
856 update_truncate_epoch(mv_table_id, backfill_epoch);
857 }
858 }
859
860 let table_new_change_log = build_table_change_log_delta(
861 old_value_ssts.into_iter(),
862 synced_ssts.iter().map(|sst| &sst.sst_info),
863 must_match!(&barrier_info.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
864 mv_log_store_truncate_epoch.into_iter(),
865 );
866
867 let epoch = barrier_info.barrier_info.prev_epoch();
868 for table_id in &barrier_info.table_ids_to_commit {
869 info.tables_to_commit
870 .try_insert(*table_id, epoch)
871 .expect("non duplicate");
872 }
873
874 info.sstables.extend(synced_ssts);
875 info.new_table_watermarks.extend(new_table_watermarks);
876 info.sst_to_context.extend(sst_to_context);
877 info.new_table_fragment_infos
878 .extend(new_table_fragment_infos);
879 info.change_log_delta.extend(table_new_change_log);
880 for (table_id, vector_index_adds) in vector_index_adds {
881 info.vector_index_delta
882 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
883 .expect("non-duplicate");
884 }
885 if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } =
886 &barrier_info.post_collect_command
887 && let Some(index_table) = collect_new_vector_index_info(job_info)
888 {
889 info.vector_index_delta
890 .try_insert(
891 index_table.id,
892 VectorIndexDelta::Init(PbVectorIndexInit {
893 info: Some(index_table.vector_index_info.unwrap()),
894 }),
895 )
896 .expect("non-duplicate");
897 }
898 info.truncate_tables.extend(truncate_tables);
899 }
900}
901
902impl Command {
903 pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
905 {
906 {
907 if !is_currently_paused {
910 Some(Mutation::Pause(PauseMutation {}))
911 } else {
912 None
913 }
914 }
915 }
916 }
917
918 pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
920 {
921 {
922 if is_currently_paused {
924 Some(Mutation::Resume(ResumeMutation {}))
925 } else {
926 None
927 }
928 }
929 }
930 }
931
932 pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
934 {
935 {
936 let mut diff = HashMap::new();
937
938 for actor_splits in split_assignment.values() {
939 diff.extend(actor_splits.clone());
940 }
941
942 Mutation::Splits(SourceChangeSplitMutation {
943 actor_splits: build_actor_connector_splits(&diff),
944 })
945 }
946 }
947 }
948
949 pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
951 {
952 {
953 let config = config.clone();
954 Mutation::Throttle(ThrottleMutation {
955 fragment_throttle: config,
956 })
957 }
958 }
959 }
960
961 pub(super) fn drop_streaming_jobs_to_mutation(
963 actors: &Vec<ActorId>,
964 dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
965 ) -> Mutation {
966 {
967 Mutation::Stop(StopMutation {
968 actors: actors.clone(),
969 dropped_sink_fragments: dropped_sink_fragment_by_targets
970 .values()
971 .flatten()
972 .cloned()
973 .collect(),
974 })
975 }
976 }
977
978 pub(super) fn create_streaming_job_to_mutation(
980 info: &CreateStreamingJobCommandInfo,
981 job_type: &CreateStreamingJobType,
982 is_currently_paused: bool,
983 edges: &mut FragmentEdgeBuildResult,
984 control_stream_manager: &ControlStreamManager,
985 actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
986 split_assignment: &SplitAssignment,
987 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
988 actor_location: &HashMap<ActorId, WorkerId>,
989 ) -> MetaResult<Mutation> {
990 {
991 {
992 let CreateStreamingJobCommandInfo {
993 stream_job_fragments,
994 upstream_fragment_downstreams,
995 fragment_backfill_ordering,
996 streaming_job,
997 ..
998 } = info;
999 let database_id = streaming_job.database_id();
1000 let added_actors: Vec<ActorId> = stream_actors
1001 .values()
1002 .flatten()
1003 .map(|actor| actor.actor_id)
1004 .collect();
1005 let actor_splits = split_assignment
1006 .values()
1007 .flat_map(build_actor_connector_splits)
1008 .collect();
1009 let subscriptions_to_add =
1010 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1011 job_type
1012 {
1013 snapshot_backfill_info
1014 .upstream_mv_table_id_to_backfill_epoch
1015 .keys()
1016 .map(|table_id| SubscriptionUpstreamInfo {
1017 subscriber_id: stream_job_fragments
1018 .stream_job_id()
1019 .as_subscriber_id(),
1020 upstream_mv_table_id: *table_id,
1021 })
1022 .collect()
1023 } else {
1024 Default::default()
1025 };
1026 let backfill_nodes_to_pause: Vec<_> =
1027 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1028 .into_iter()
1029 .collect();
1030
1031 let new_upstream_sinks =
1032 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1033 sink_fragment_id,
1034 sink_output_fields,
1035 project_exprs,
1036 new_sink_downstream,
1037 ..
1038 }) = job_type
1039 {
1040 let new_sink_actors = stream_actors
1041 .get(sink_fragment_id)
1042 .unwrap_or_else(|| {
1043 panic!("upstream sink fragment {sink_fragment_id} not exist")
1044 })
1045 .iter()
1046 .map(|actor| {
1047 let worker_id = actor_location[&actor.actor_id];
1048 PbActorInfo {
1049 actor_id: actor.actor_id,
1050 host: Some(control_stream_manager.host_addr(worker_id)),
1051 partial_graph_id: to_partial_graph_id(database_id, None),
1052 }
1053 });
1054 let new_upstream_sink = PbNewUpstreamSink {
1055 info: Some(PbUpstreamSinkInfo {
1056 upstream_fragment_id: *sink_fragment_id,
1057 sink_output_schema: sink_output_fields.clone(),
1058 project_exprs: project_exprs.clone(),
1059 }),
1060 upstream_actors: new_sink_actors.collect(),
1061 };
1062 HashMap::from([(
1063 new_sink_downstream.downstream_fragment_id,
1064 new_upstream_sink,
1065 )])
1066 } else {
1067 HashMap::new()
1068 };
1069
1070 let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1071 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1072
1073 let add_mutation = AddMutation {
1074 actor_dispatchers: edges
1075 .dispatchers
1076 .extract_if(|fragment_id, _| {
1077 upstream_fragment_downstreams.contains_key(fragment_id)
1078 })
1079 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1080 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1081 .collect(),
1082 added_actors,
1083 actor_splits,
1084 pause: is_currently_paused,
1086 subscriptions_to_add,
1087 backfill_nodes_to_pause,
1088 actor_cdc_table_snapshot_splits,
1089 new_upstream_sinks,
1090 };
1091
1092 Ok(Mutation::Add(add_mutation))
1093 }
1094 }
1095 }
1096
1097 pub(super) fn replace_stream_job_to_mutation(
1099 ReplaceStreamJobPlan {
1100 old_fragments,
1101 replace_upstream,
1102 upstream_fragment_downstreams,
1103 auto_refresh_schema_sinks,
1104 ..
1105 }: &ReplaceStreamJobPlan,
1106 edges: &mut FragmentEdgeBuildResult,
1107 database_info: &mut InflightDatabaseInfo,
1108 split_assignment: &SplitAssignment,
1109 ) -> MetaResult<Option<Mutation>> {
1110 {
1111 {
1112 let merge_updates = edges
1113 .merge_updates
1114 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1115 .collect();
1116 let dispatchers = edges
1117 .dispatchers
1118 .extract_if(|fragment_id, _| {
1119 upstream_fragment_downstreams.contains_key(fragment_id)
1120 })
1121 .collect();
1122 let actor_cdc_table_snapshot_splits = database_info
1123 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1124 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1125 let old_fragments = old_fragments.fragments.keys().copied();
1126 let auto_refresh_sink_fragment_ids = auto_refresh_schema_sinks
1127 .as_ref()
1128 .into_iter()
1129 .flat_map(|sinks| sinks.iter())
1130 .map(|sink| sink.original_fragment.fragment_id);
1131 Ok(Self::generate_update_mutation_for_replace_table(
1132 old_fragments
1133 .chain(auto_refresh_sink_fragment_ids)
1134 .flat_map(|fragment_id| {
1135 database_info.fragment(fragment_id).actors.keys().copied()
1136 }),
1137 merge_updates,
1138 dispatchers,
1139 split_assignment,
1140 actor_cdc_table_snapshot_splits,
1141 auto_refresh_schema_sinks.as_ref(),
1142 ))
1143 }
1144 }
1145 }
1146
1147 pub(super) fn reschedule_to_mutation(
1149 reschedules: &HashMap<FragmentId, Reschedule>,
1150 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1151 control_stream_manager: &ControlStreamManager,
1152 database_info: &mut InflightDatabaseInfo,
1153 ) -> MetaResult<Option<Mutation>> {
1154 {
1155 {
1156 let database_id = database_info.database_id;
1157 let mut dispatcher_update = HashMap::new();
1158 for reschedule in reschedules.values() {
1159 for &(upstream_fragment_id, dispatcher_id) in
1160 &reschedule.upstream_fragment_dispatcher_ids
1161 {
1162 let upstream_actor_ids = fragment_actors
1164 .get(&upstream_fragment_id)
1165 .expect("should contain");
1166
1167 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1168
1169 for &actor_id in upstream_actor_ids {
1171 let added_downstream_actor_id = if upstream_reschedule
1172 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1173 .unwrap_or(true)
1174 {
1175 reschedule
1176 .added_actors
1177 .values()
1178 .flatten()
1179 .cloned()
1180 .collect()
1181 } else {
1182 Default::default()
1183 };
1184 dispatcher_update
1186 .try_insert(
1187 (actor_id, dispatcher_id),
1188 DispatcherUpdate {
1189 actor_id,
1190 dispatcher_id,
1191 hash_mapping: reschedule
1192 .upstream_dispatcher_mapping
1193 .as_ref()
1194 .map(|m| m.to_protobuf()),
1195 added_downstream_actor_id,
1196 removed_downstream_actor_id: reschedule
1197 .removed_actors
1198 .iter()
1199 .cloned()
1200 .collect(),
1201 },
1202 )
1203 .unwrap();
1204 }
1205 }
1206 }
1207 let dispatcher_update = dispatcher_update.into_values().collect();
1208
1209 let mut merge_update = HashMap::new();
1210 for (&fragment_id, reschedule) in reschedules {
1211 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1212 let downstream_actor_ids = fragment_actors
1214 .get(&downstream_fragment_id)
1215 .expect("should contain");
1216
1217 let downstream_removed_actors: HashSet<_> = reschedules
1221 .get(&downstream_fragment_id)
1222 .map(|downstream_reschedule| {
1223 downstream_reschedule
1224 .removed_actors
1225 .iter()
1226 .copied()
1227 .collect()
1228 })
1229 .unwrap_or_default();
1230
1231 for &actor_id in downstream_actor_ids {
1233 if downstream_removed_actors.contains(&actor_id) {
1234 continue;
1235 }
1236
1237 merge_update
1239 .try_insert(
1240 (actor_id, fragment_id),
1241 MergeUpdate {
1242 actor_id,
1243 upstream_fragment_id: fragment_id,
1244 new_upstream_fragment_id: None,
1245 added_upstream_actors: reschedule
1246 .added_actors
1247 .iter()
1248 .flat_map(|(worker_id, actors)| {
1249 let host =
1250 control_stream_manager.host_addr(*worker_id);
1251 actors.iter().map(move |&actor_id| PbActorInfo {
1252 actor_id,
1253 host: Some(host.clone()),
1254 partial_graph_id: to_partial_graph_id(
1256 database_id,
1257 None,
1258 ),
1259 })
1260 })
1261 .collect(),
1262 removed_upstream_actor_id: reschedule
1263 .removed_actors
1264 .iter()
1265 .cloned()
1266 .collect(),
1267 },
1268 )
1269 .unwrap();
1270 }
1271 }
1272 }
1273 let merge_update = merge_update.into_values().collect();
1274
1275 let mut actor_vnode_bitmap_update = HashMap::new();
1276 for reschedule in reschedules.values() {
1277 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1279 let bitmap = bitmap.to_protobuf();
1280 actor_vnode_bitmap_update
1281 .try_insert(actor_id, bitmap)
1282 .unwrap();
1283 }
1284 }
1285 let dropped_actors = reschedules
1286 .values()
1287 .flat_map(|r| r.removed_actors.iter().copied())
1288 .collect();
1289 let mut actor_splits = HashMap::new();
1290 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1291 for (fragment_id, reschedule) in reschedules {
1292 for (actor_id, splits) in &reschedule.actor_splits {
1293 actor_splits.insert(
1294 *actor_id,
1295 ConnectorSplits {
1296 splits: splits.iter().map(ConnectorSplit::from).collect(),
1297 },
1298 );
1299 }
1300
1301 if let Some(assignment) =
1302 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1303 {
1304 actor_cdc_table_snapshot_splits.extend(assignment)
1305 }
1306 }
1307
1308 let actor_new_dispatchers = HashMap::new();
1310 let mutation = Mutation::Update(UpdateMutation {
1311 dispatcher_update,
1312 merge_update,
1313 actor_vnode_bitmap_update,
1314 dropped_actors,
1315 actor_splits,
1316 actor_new_dispatchers,
1317 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1318 splits: actor_cdc_table_snapshot_splits,
1319 }),
1320 sink_schema_change: Default::default(),
1321 subscriptions_to_drop: vec![],
1322 });
1323 tracing::debug!("update mutation: {mutation:?}");
1324 Ok(Some(mutation))
1325 }
1326 }
1327 }
1328
1329 pub(super) fn create_subscription_to_mutation(
1331 upstream_mv_table_id: TableId,
1332 subscription_id: SubscriptionId,
1333 ) -> Mutation {
1334 {
1335 Mutation::Add(AddMutation {
1336 actor_dispatchers: Default::default(),
1337 added_actors: vec![],
1338 actor_splits: Default::default(),
1339 pause: false,
1340 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1341 upstream_mv_table_id,
1342 subscriber_id: subscription_id.as_subscriber_id(),
1343 }],
1344 backfill_nodes_to_pause: vec![],
1345 actor_cdc_table_snapshot_splits: None,
1346 new_upstream_sinks: Default::default(),
1347 })
1348 }
1349 }
1350
1351 pub(super) fn drop_subscription_to_mutation(
1353 upstream_mv_table_id: TableId,
1354 subscription_id: SubscriptionId,
1355 ) -> Mutation {
1356 {
1357 Mutation::DropSubscriptions(DropSubscriptionsMutation {
1358 info: vec![SubscriptionUpstreamInfo {
1359 subscriber_id: subscription_id.as_subscriber_id(),
1360 upstream_mv_table_id,
1361 }],
1362 })
1363 }
1364 }
1365
1366 pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1368 {
1369 {
1370 let mut connector_props_infos = HashMap::default();
1371 for (k, v) in config {
1372 connector_props_infos.insert(
1373 k.as_raw_id(),
1374 ConnectorPropsInfo {
1375 connector_props_info: v.clone(),
1376 },
1377 );
1378 }
1379 Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1380 connector_props_infos,
1381 })
1382 }
1383 }
1384 }
1385
1386 pub(super) fn refresh_to_mutation(
1388 table_id: TableId,
1389 associated_source_id: SourceId,
1390 ) -> Mutation {
1391 Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1392 table_id,
1393 associated_source_id,
1394 })
1395 }
1396
1397 pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1399 Mutation::ListFinish(ListFinishMutation {
1400 associated_source_id,
1401 })
1402 }
1403
1404 pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1406 Mutation::LoadFinish(LoadFinishMutation {
1407 associated_source_id,
1408 })
1409 }
1410
1411 pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1413 Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1414 source_id: source_id.as_raw_id(),
1415 })
1416 }
1417
1418 pub(super) fn resume_backfill_to_mutation(
1420 target: &ResumeBackfillTarget,
1421 database_info: &InflightDatabaseInfo,
1422 ) -> MetaResult<Option<Mutation>> {
1423 {
1424 {
1425 let fragment_ids: HashSet<_> = match target {
1426 ResumeBackfillTarget::Job(job_id) => {
1427 database_info.backfill_fragment_ids_for_job(*job_id)?
1428 }
1429 ResumeBackfillTarget::Fragment(fragment_id) => {
1430 if !database_info.is_backfill_fragment(*fragment_id)? {
1431 return Err(MetaError::invalid_parameter(format!(
1432 "fragment {} is not a backfill node",
1433 fragment_id
1434 )));
1435 }
1436 HashSet::from([*fragment_id])
1437 }
1438 };
1439 if fragment_ids.is_empty() {
1440 warn!(
1441 ?target,
1442 "resume backfill command ignored because no backfill fragments found"
1443 );
1444 Ok(None)
1445 } else {
1446 Ok(Some(Mutation::StartFragmentBackfill(
1447 StartFragmentBackfillMutation {
1448 fragment_ids: fragment_ids.into_iter().collect(),
1449 },
1450 )))
1451 }
1452 }
1453 }
1454 }
1455
1456 pub(super) fn inject_source_offsets_to_mutation(
1458 source_id: SourceId,
1459 split_offsets: &HashMap<String, String>,
1460 ) -> Mutation {
1461 Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1462 source_id: source_id.as_raw_id(),
1463 split_offsets: split_offsets.clone(),
1464 })
1465 }
1466
1467 pub(super) fn create_streaming_job_actors_to_create(
1469 info: &CreateStreamingJobCommandInfo,
1470 edges: &mut FragmentEdgeBuildResult,
1471 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1472 actor_location: &HashMap<ActorId, WorkerId>,
1473 ) -> StreamJobActorsToCreate {
1474 {
1475 {
1476 edges.collect_actors_to_create(info.stream_job_fragments.fragments.values().map(
1477 |fragment| {
1478 let actors = stream_actors
1479 .get(&fragment.fragment_id)
1480 .into_iter()
1481 .flatten()
1482 .map(|actor| (actor, actor_location[&actor.actor_id]));
1483 (
1484 fragment.fragment_id,
1485 &fragment.nodes,
1486 actors,
1487 [], )
1489 },
1490 ))
1491 }
1492 }
1493 }
1494
1495 pub(super) fn reschedule_actors_to_create(
1497 reschedules: &HashMap<FragmentId, Reschedule>,
1498 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1499 database_info: &InflightDatabaseInfo,
1500 control_stream_manager: &ControlStreamManager,
1501 ) -> StreamJobActorsToCreate {
1502 {
1503 {
1504 let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1505 reschedules.iter().map(|(fragment_id, reschedule)| {
1506 (
1507 *fragment_id,
1508 reschedule.newly_created_actors.values().map(
1509 |((actor, dispatchers), _)| {
1510 (actor.actor_id, dispatchers.as_slice())
1511 },
1512 ),
1513 )
1514 }),
1515 Some((reschedules, fragment_actors)),
1516 database_info,
1517 control_stream_manager,
1518 );
1519 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1520 for (fragment_id, (actor, dispatchers), worker_id) in
1521 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1522 reschedule
1523 .newly_created_actors
1524 .values()
1525 .map(|(actors, status)| (*fragment_id, actors, status))
1526 })
1527 {
1528 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1529 map.entry(*worker_id)
1530 .or_default()
1531 .entry(fragment_id)
1532 .or_insert_with(|| {
1533 let node = database_info.fragment(fragment_id).nodes.clone();
1534 let subscribers =
1535 database_info.fragment_subscribers(fragment_id).collect();
1536 (node, vec![], subscribers)
1537 })
1538 .1
1539 .push((actor.clone(), upstreams, dispatchers.clone()));
1540 }
1541 map
1542 }
1543 }
1544 }
1545
1546 pub(super) fn replace_stream_job_actors_to_create(
1548 replace_table: &ReplaceStreamJobPlan,
1549 edges: &mut FragmentEdgeBuildResult,
1550 database_info: &InflightDatabaseInfo,
1551 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1552 actor_location: &HashMap<ActorId, WorkerId>,
1553 ) -> StreamJobActorsToCreate {
1554 {
1555 {
1556 let mut actors = edges.collect_actors_to_create(
1557 replace_table
1558 .new_fragments
1559 .fragments
1560 .values()
1561 .map(|fragment| {
1562 let actors = stream_actors
1563 .get(&fragment.fragment_id)
1564 .into_iter()
1565 .flatten()
1566 .map(|actor| (actor, actor_location[&actor.actor_id]));
1567 (
1568 fragment.fragment_id,
1569 &fragment.nodes,
1570 actors,
1571 database_info
1572 .job_subscribers(replace_table.old_fragments.stream_job_id),
1573 )
1574 }),
1575 );
1576
1577 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1579 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1580 (
1581 sink.new_fragment.fragment_id,
1582 &sink.new_fragment.nodes,
1583 stream_actors
1584 .get(&sink.new_fragment.fragment_id)
1585 .into_iter()
1586 .flatten()
1587 .map(|actor| (actor, actor_location[&actor.actor_id])),
1588 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1589 )
1590 }));
1591 for (worker_id, fragment_actors) in sink_actors {
1592 actors.entry(worker_id).or_default().extend(fragment_actors);
1593 }
1594 }
1595 actors
1596 }
1597 }
1598 }
1599
1600 fn generate_update_mutation_for_replace_table(
1601 dropped_actors: impl IntoIterator<Item = ActorId>,
1602 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1603 dispatchers: FragmentActorDispatchers,
1604 split_assignment: &SplitAssignment,
1605 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1606 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1607 ) -> Option<Mutation> {
1608 let dropped_actors = dropped_actors.into_iter().collect();
1609
1610 let actor_new_dispatchers = dispatchers
1611 .into_values()
1612 .flatten()
1613 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1614 .collect();
1615
1616 let actor_splits = split_assignment
1617 .values()
1618 .flat_map(build_actor_connector_splits)
1619 .collect();
1620 Some(Mutation::Update(UpdateMutation {
1621 actor_new_dispatchers,
1622 merge_update: merge_updates.into_values().flatten().collect(),
1623 dropped_actors,
1624 actor_splits,
1625 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1626 sink_schema_change: auto_refresh_schema_sinks
1627 .as_ref()
1628 .into_iter()
1629 .flat_map(|sinks| {
1630 sinks.iter().map(|sink| {
1631 let op = if !sink.removed_column_names.is_empty() {
1632 PbSinkSchemaChangeOp::DropColumns(PbSinkDropColumnsOp {
1633 column_names: sink.removed_column_names.clone(),
1634 })
1635 } else {
1636 PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1637 fields: sink
1638 .newly_add_fields
1639 .iter()
1640 .map(|field| field.to_prost())
1641 .collect(),
1642 })
1643 };
1644 (
1645 sink.original_sink.id.as_raw_id(),
1646 PbSinkSchemaChange {
1647 original_schema: sink
1648 .original_sink
1649 .columns
1650 .iter()
1651 .map(|col| PbField {
1652 data_type: Some(
1653 col.column_desc
1654 .as_ref()
1655 .unwrap()
1656 .column_type
1657 .as_ref()
1658 .unwrap()
1659 .clone(),
1660 ),
1661 name: col.column_desc.as_ref().unwrap().name.clone(),
1662 })
1663 .collect(),
1664 op: Some(op),
1665 },
1666 )
1667 })
1668 })
1669 .collect(),
1670 ..Default::default()
1671 }))
1672 }
1673}
1674
1675impl Command {
1676 #[expect(clippy::type_complexity)]
1677 pub(super) fn collect_database_partial_graph_actor_upstreams(
1678 actor_dispatchers: impl Iterator<
1679 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1680 >,
1681 reschedule_dispatcher_update: Option<(
1682 &HashMap<FragmentId, Reschedule>,
1683 &HashMap<FragmentId, HashSet<ActorId>>,
1684 )>,
1685 database_info: &InflightDatabaseInfo,
1686 control_stream_manager: &ControlStreamManager,
1687 ) -> HashMap<ActorId, ActorUpstreams> {
1688 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1689 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1690 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1691 for (upstream_actor_id, dispatchers) in upstream_actors {
1692 let upstream_actor_location =
1693 upstream_fragment.actors[&upstream_actor_id].worker_id;
1694 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1695 for downstream_actor_id in dispatchers
1696 .iter()
1697 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1698 {
1699 actor_upstreams
1700 .entry(*downstream_actor_id)
1701 .or_default()
1702 .entry(upstream_fragment_id)
1703 .or_default()
1704 .insert(
1705 upstream_actor_id,
1706 PbActorInfo {
1707 actor_id: upstream_actor_id,
1708 host: Some(upstream_actor_host.clone()),
1709 partial_graph_id: to_partial_graph_id(
1710 database_info.database_id,
1711 None,
1712 ),
1713 },
1714 );
1715 }
1716 }
1717 }
1718 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1719 for reschedule in reschedules.values() {
1720 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1721 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1722 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1723 for upstream_actor_id in fragment_actors
1724 .get(upstream_fragment_id)
1725 .expect("should exist")
1726 {
1727 let upstream_actor_location =
1728 upstream_fragment.actors[upstream_actor_id].worker_id;
1729 let upstream_actor_host =
1730 control_stream_manager.host_addr(upstream_actor_location);
1731 if let Some(upstream_reschedule) = upstream_reschedule
1732 && upstream_reschedule
1733 .removed_actors
1734 .contains(upstream_actor_id)
1735 {
1736 continue;
1737 }
1738 for (_, downstream_actor_id) in
1739 reschedule
1740 .added_actors
1741 .iter()
1742 .flat_map(|(worker_id, actors)| {
1743 actors.iter().map(|actor| (*worker_id, *actor))
1744 })
1745 {
1746 actor_upstreams
1747 .entry(downstream_actor_id)
1748 .or_default()
1749 .entry(*upstream_fragment_id)
1750 .or_default()
1751 .insert(
1752 *upstream_actor_id,
1753 PbActorInfo {
1754 actor_id: *upstream_actor_id,
1755 host: Some(upstream_actor_host.clone()),
1756 partial_graph_id: to_partial_graph_id(
1757 database_info.database_id,
1758 None,
1759 ),
1760 },
1761 );
1762 }
1763 }
1764 }
1765 }
1766 }
1767 actor_upstreams
1768 }
1769}