1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation, streaming_job};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37 PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkDropColumnsOp,
49 PbSinkSchemaChange, PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation,
50 StartFragmentBackfillMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation,
51 UpdateMutation,
52};
53use risingwave_pb::stream_service::BarrierCompleteResponse;
54use tracing::warn;
55
56use super::info::InflightDatabaseInfo;
57use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
58use crate::barrier::edge_builder::FragmentEdgeBuildResult;
59use crate::barrier::info::BarrierInfo;
60use crate::barrier::partial_graph::PartialGraphBarrierInfo;
61use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
62use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
63use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
64use crate::controller::scale::LoadedFragmentContext;
65use crate::controller::utils::StreamingJobExtraInfo;
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70 FragmentId, FragmentReplaceUpstream, StreamActor, StreamActorWithDispatchers,
71 StreamJobActorsToCreate, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::{
74 AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
75 ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
76 build_actor_connector_splits,
77};
78use crate::{MetaError, MetaResult};
79
80#[derive(Debug, Clone)]
83pub struct Reschedule {
84 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
86
87 pub removed_actors: HashSet<ActorId>,
89
90 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
92
93 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
95 pub upstream_dispatcher_mapping: Option<ActorMapping>,
100
101 pub downstream_fragment_ids: Vec<FragmentId>,
103
104 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
108
109 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
110}
111
112#[derive(Debug, Clone)]
113pub struct ReschedulePlan {
114 pub reschedules: HashMap<FragmentId, Reschedule>,
115 pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
118}
119
120#[derive(Debug, Clone)]
122pub struct RescheduleContext {
123 pub loaded: LoadedFragmentContext,
124 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
125 pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
126 pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
127 pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
128}
129
130impl RescheduleContext {
131 pub fn empty() -> Self {
132 Self {
133 loaded: LoadedFragmentContext::default(),
134 job_extra_info: HashMap::new(),
135 upstream_fragments: HashMap::new(),
136 downstream_fragments: HashMap::new(),
137 downstream_relations: HashMap::new(),
138 }
139 }
140
141 pub fn is_empty(&self) -> bool {
142 self.loaded.is_empty()
143 }
144
145 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
146 let loaded = self.loaded.for_database(database_id)?;
147 let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
148 let fragment_ids: HashSet<FragmentId> = loaded
151 .job_fragments
152 .values()
153 .flat_map(|fragments| fragments.keys().copied())
154 .collect();
155
156 let job_extra_info = self
157 .job_extra_info
158 .iter()
159 .filter(|(job_id, _)| job_ids.contains(*job_id))
160 .map(|(job_id, info)| (*job_id, info.clone()))
161 .collect();
162
163 let upstream_fragments = self
164 .upstream_fragments
165 .iter()
166 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
167 .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
168 .collect();
169
170 let downstream_fragments = self
171 .downstream_fragments
172 .iter()
173 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
174 .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
175 .collect();
176
177 let downstream_relations = self
178 .downstream_relations
179 .iter()
180 .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
184 .map(|(key, relation)| (*key, relation.clone()))
185 .collect();
186
187 Some(Self {
188 loaded,
189 job_extra_info,
190 upstream_fragments,
191 downstream_fragments,
192 downstream_relations,
193 })
194 }
195
196 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
199 let Self {
200 loaded,
201 job_extra_info,
202 upstream_fragments,
203 downstream_fragments,
204 downstream_relations,
205 } = self;
206
207 let mut contexts: HashMap<_, _> = loaded
208 .into_database_contexts()
209 .into_iter()
210 .map(|(database_id, loaded)| {
211 (
212 database_id,
213 Self {
214 loaded,
215 job_extra_info: HashMap::new(),
216 upstream_fragments: HashMap::new(),
217 downstream_fragments: HashMap::new(),
218 downstream_relations: HashMap::new(),
219 },
220 )
221 })
222 .collect();
223
224 if contexts.is_empty() {
225 return contexts;
226 }
227
228 let mut job_databases = HashMap::new();
229 let mut fragment_databases = HashMap::new();
230 for (&database_id, context) in &contexts {
231 for job_id in context.loaded.job_map.keys().copied() {
232 job_databases.insert(job_id, database_id);
233 }
234 for fragment_id in context
235 .loaded
236 .job_fragments
237 .values()
238 .flat_map(|fragments| fragments.keys().copied())
239 {
240 fragment_databases.insert(fragment_id, database_id);
241 }
242 }
243
244 for (job_id, info) in job_extra_info {
245 if let Some(database_id) = job_databases.get(&job_id).copied() {
246 contexts
247 .get_mut(&database_id)
248 .expect("database context should exist for job")
249 .job_extra_info
250 .insert(job_id, info);
251 }
252 }
253
254 for (fragment_id, upstreams) in upstream_fragments {
255 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
256 contexts
257 .get_mut(&database_id)
258 .expect("database context should exist for fragment")
259 .upstream_fragments
260 .insert(fragment_id, upstreams);
261 }
262 }
263
264 for (fragment_id, downstreams) in downstream_fragments {
265 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
266 contexts
267 .get_mut(&database_id)
268 .expect("database context should exist for fragment")
269 .downstream_fragments
270 .insert(fragment_id, downstreams);
271 }
272 }
273
274 for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
275 if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
278 contexts
279 .get_mut(&database_id)
280 .expect("database context should exist for relation source")
281 .downstream_relations
282 .insert((source_fragment_id, target_fragment_id), relation);
283 }
284 }
285
286 contexts
287 }
288}
289
290#[derive(Debug, Clone)]
297pub struct ReplaceStreamJobPlan {
298 pub old_fragments: StreamJobFragments,
299 pub new_fragments: StreamJobFragmentsToCreate,
300 pub database_resource_group: String,
302 pub replace_upstream: FragmentReplaceUpstream,
305 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
306 pub split_plan: ReplaceJobSplitPlan,
309 pub streaming_job: StreamingJob,
311 pub streaming_job_model: streaming_job::Model,
313 pub tmp_id: JobId,
315 pub to_drop_state_table_ids: Vec<TableId>,
317 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
318}
319
320impl ReplaceStreamJobPlan {
321 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
323 let mut fragment_replacements = HashMap::new();
324 for (upstream_fragment_id, new_upstream_fragment_id) in
325 self.replace_upstream.values().flatten()
326 {
327 {
328 let r =
329 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
330 if let Some(r) = r {
331 assert_eq!(
332 *new_upstream_fragment_id, r,
333 "one fragment is replaced by multiple fragments"
334 );
335 }
336 }
337 }
338 fragment_replacements
339 }
340}
341
342#[derive(educe::Educe, Clone)]
343#[educe(Debug)]
344pub struct CreateStreamingJobCommandInfo {
345 #[educe(Debug(ignore))]
346 pub stream_job_fragments: StreamJobFragmentsToCreate,
347 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
348 pub database_resource_group: String,
350 pub init_split_assignment: SourceSplitAssignment,
352 pub definition: String,
353 pub job_type: StreamingJobType,
354 pub create_type: CreateType,
355 pub streaming_job: StreamingJob,
356 pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
357 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
358 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
359 pub is_serverless: bool,
360 pub streaming_job_model: streaming_job::Model,
362}
363
364impl StreamJobFragments {
365 pub(super) fn new_fragment_info<'a>(
368 &'a self,
369 stream_actors: &'a HashMap<FragmentId, Vec<StreamActor>>,
370 actor_location: &'a HashMap<ActorId, WorkerId>,
371 assignment: &'a SplitAssignment,
372 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
373 self.fragments.values().map(|fragment| {
374 (
375 fragment.fragment_id,
376 InflightFragmentInfo {
377 fragment_id: fragment.fragment_id,
378 distribution_type: fragment.distribution_type.into(),
379 fragment_type_mask: fragment.fragment_type_mask,
380 vnode_count: fragment.vnode_count(),
381 nodes: fragment.nodes.clone(),
382 actors: stream_actors
383 .get(&fragment.fragment_id)
384 .into_iter()
385 .flatten()
386 .map(|actor| {
387 (
388 actor.actor_id,
389 InflightActorInfo {
390 worker_id: actor_location[&actor.actor_id],
391 vnode_bitmap: actor.vnode_bitmap.clone(),
392 splits: assignment
393 .get(&fragment.fragment_id)
394 .and_then(|s| s.get(&actor.actor_id))
395 .cloned()
396 .unwrap_or_default(),
397 },
398 )
399 })
400 .collect(),
401 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
402 },
403 )
404 })
405 }
406}
407
408#[derive(Debug, Clone)]
409pub struct SnapshotBackfillInfo {
410 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
414}
415
416#[derive(Debug, Clone)]
417pub enum CreateStreamingJobType {
418 Normal,
419 SinkIntoTable(UpstreamSinkInfo),
420 SnapshotBackfill(SnapshotBackfillInfo),
421}
422
423#[derive(Debug)]
428pub enum Command {
429 Flush,
432
433 Pause,
436
437 Resume,
441
442 DropStreamingJobs {
450 streaming_job_ids: HashSet<JobId>,
451 unregistered_state_table_ids: HashSet<TableId>,
453 unregistered_fragment_ids: HashSet<FragmentId>,
454 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
456 },
457
458 CreateStreamingJob {
468 info: CreateStreamingJobCommandInfo,
469 job_type: CreateStreamingJobType,
470 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
471 },
472
473 RescheduleIntent {
475 context: RescheduleContext,
476 reschedule_plan: Option<ReschedulePlan>,
481 },
482
483 ReplaceStreamJob(ReplaceStreamJobPlan),
490
491 SourceChangeSplit(SplitState),
494
495 Throttle {
498 jobs: HashSet<JobId>,
499 config: HashMap<FragmentId, ThrottleConfig>,
500 },
501
502 CreateSubscription {
505 subscription_id: SubscriptionId,
506 upstream_mv_table_id: TableId,
507 retention_second: u64,
508 },
509
510 DropSubscription {
514 subscription_id: SubscriptionId,
515 upstream_mv_table_id: TableId,
516 },
517
518 AlterSubscriptionRetention {
520 subscription_id: SubscriptionId,
521 upstream_mv_table_id: TableId,
522 retention_second: u64,
523 },
524
525 ConnectorPropsChange(ConnectorPropsChange),
526
527 Refresh {
530 table_id: TableId,
531 associated_source_id: SourceId,
532 },
533 ListFinish {
534 table_id: TableId,
535 associated_source_id: SourceId,
536 },
537 LoadFinish {
538 table_id: TableId,
539 associated_source_id: SourceId,
540 },
541
542 ResetSource {
545 source_id: SourceId,
546 },
547
548 ResumeBackfill {
551 target: ResumeBackfillTarget,
552 },
553
554 InjectSourceOffsets {
558 source_id: SourceId,
559 split_offsets: HashMap<String, String>,
561 },
562}
563
564#[derive(Debug, Clone, Copy)]
565pub enum ResumeBackfillTarget {
566 Job(JobId),
567 Fragment(FragmentId),
568}
569
570impl std::fmt::Display for Command {
572 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
573 match self {
574 Command::Flush => write!(f, "Flush"),
575 Command::Pause => write!(f, "Pause"),
576 Command::Resume => write!(f, "Resume"),
577 Command::DropStreamingJobs {
578 streaming_job_ids, ..
579 } => {
580 write!(
581 f,
582 "DropStreamingJobs: {}",
583 streaming_job_ids.iter().sorted().join(", ")
584 )
585 }
586 Command::CreateStreamingJob { info, .. } => {
587 write!(f, "CreateStreamingJob: {}", info.streaming_job)
588 }
589 Command::RescheduleIntent {
590 reschedule_plan, ..
591 } => {
592 if reschedule_plan.is_some() {
593 write!(f, "RescheduleIntent(planned)")
594 } else {
595 write!(f, "RescheduleIntent")
596 }
597 }
598 Command::ReplaceStreamJob(plan) => {
599 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
600 }
601 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
602 Command::Throttle { .. } => write!(f, "Throttle"),
603 Command::CreateSubscription {
604 subscription_id, ..
605 } => write!(f, "CreateSubscription: {subscription_id}"),
606 Command::DropSubscription {
607 subscription_id, ..
608 } => write!(f, "DropSubscription: {subscription_id}"),
609 Command::AlterSubscriptionRetention {
610 subscription_id,
611 retention_second,
612 ..
613 } => write!(
614 f,
615 "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
616 ),
617 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
618 Command::Refresh {
619 table_id,
620 associated_source_id,
621 } => write!(
622 f,
623 "Refresh: {} (source: {})",
624 table_id, associated_source_id
625 ),
626 Command::ListFinish {
627 table_id,
628 associated_source_id,
629 } => write!(
630 f,
631 "ListFinish: {} (source: {})",
632 table_id, associated_source_id
633 ),
634 Command::LoadFinish {
635 table_id,
636 associated_source_id,
637 } => write!(
638 f,
639 "LoadFinish: {} (source: {})",
640 table_id, associated_source_id
641 ),
642 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
643 Command::ResumeBackfill { target } => match target {
644 ResumeBackfillTarget::Job(job_id) => {
645 write!(f, "ResumeBackfill: job={job_id}")
646 }
647 ResumeBackfillTarget::Fragment(fragment_id) => {
648 write!(f, "ResumeBackfill: fragment={fragment_id}")
649 }
650 },
651 Command::InjectSourceOffsets {
652 source_id,
653 split_offsets,
654 } => write!(
655 f,
656 "InjectSourceOffsets: {} ({} splits)",
657 source_id,
658 split_offsets.len()
659 ),
660 }
661 }
662}
663
664impl Command {
665 pub fn pause() -> Self {
666 Self::Pause
667 }
668
669 pub fn resume() -> Self {
670 Self::Resume
671 }
672
673 pub fn need_checkpoint(&self) -> bool {
674 !matches!(self, Command::Resume)
676 }
677}
678
679#[derive(Debug)]
680pub enum PostCollectCommand {
681 Command(String),
682 DropStreamingJobs,
683 CreateStreamingJob {
684 info: CreateStreamingJobCommandInfo,
685 job_type: CreateStreamingJobType,
686 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
687 resolved_split_assignment: SplitAssignment,
688 },
689 Reschedule {
690 reschedules: HashMap<FragmentId, Reschedule>,
691 },
692 ReplaceStreamJob {
693 plan: ReplaceStreamJobPlan,
694 resolved_split_assignment: SplitAssignment,
695 },
696 SourceChangeSplit {
697 split_assignment: SplitAssignment,
698 },
699 CreateSubscription {
700 subscription_id: SubscriptionId,
701 },
702 ConnectorPropsChange(ConnectorPropsChange),
703 ResumeBackfill {
704 target: ResumeBackfillTarget,
705 },
706}
707
708impl PostCollectCommand {
709 pub fn barrier() -> Self {
710 PostCollectCommand::Command("barrier".to_owned())
711 }
712
713 pub fn should_checkpoint(&self) -> bool {
714 match self {
715 PostCollectCommand::DropStreamingJobs
716 | PostCollectCommand::CreateStreamingJob { .. }
717 | PostCollectCommand::Reschedule { .. }
718 | PostCollectCommand::ReplaceStreamJob { .. }
719 | PostCollectCommand::SourceChangeSplit { .. }
720 | PostCollectCommand::CreateSubscription { .. }
721 | PostCollectCommand::ConnectorPropsChange(_)
722 | PostCollectCommand::ResumeBackfill { .. } => true,
723 PostCollectCommand::Command(_) => false,
724 }
725 }
726
727 pub fn command_name(&self) -> &str {
728 match self {
729 PostCollectCommand::Command(name) => name.as_str(),
730 PostCollectCommand::DropStreamingJobs => "DropStreamingJobs",
731 PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
732 PostCollectCommand::Reschedule { .. } => "Reschedule",
733 PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
734 PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
735 PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
736 PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
737 PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
738 }
739 }
740}
741
742impl Display for PostCollectCommand {
743 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
744 f.write_str(self.command_name())
745 }
746}
747
748#[derive(Debug, Clone, PartialEq, Eq)]
749pub enum BarrierKind {
750 Initial,
751 Barrier,
752 Checkpoint(Vec<u64>),
754}
755
756impl BarrierKind {
757 pub fn to_protobuf(&self) -> PbBarrierKind {
758 match self {
759 BarrierKind::Initial => PbBarrierKind::Initial,
760 BarrierKind::Barrier => PbBarrierKind::Barrier,
761 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
762 }
763 }
764
765 pub fn is_checkpoint(&self) -> bool {
766 matches!(self, BarrierKind::Checkpoint(_))
767 }
768
769 pub fn is_initial(&self) -> bool {
770 matches!(self, BarrierKind::Initial)
771 }
772
773 pub fn as_str_name(&self) -> &'static str {
774 match self {
775 BarrierKind::Initial => "Initial",
776 BarrierKind::Barrier => "Barrier",
777 BarrierKind::Checkpoint(_) => "Checkpoint",
778 }
779 }
780}
781
782impl BarrierInfo {
783 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
784 let Some(truncate_timestamptz) = Timestamptz::from_secs(
785 self.prev_epoch.value().as_timestamptz().timestamp() - retention_second as i64,
786 ) else {
787 warn!(retention_second, prev_epoch = ?self.prev_epoch.value(), "invalid retention second value");
788 return self.prev_epoch.value();
789 };
790 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
791 }
792}
793
794impl Command {
795 pub(super) fn collect_commit_epoch_info(
796 database_info: &InflightDatabaseInfo,
797 barrier_info: &PartialGraphBarrierInfo,
798 info: &mut CommitEpochInfo,
799 resps: Vec<BarrierCompleteResponse>,
800 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
801 ) {
802 let (
803 sst_to_context,
804 synced_ssts,
805 new_table_watermarks,
806 old_value_ssts,
807 vector_index_adds,
808 truncate_tables,
809 ) = collect_resp_info(resps);
810
811 let new_table_fragment_infos =
812 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } =
813 &barrier_info.post_collect_command
814 {
815 assert!(!matches!(
816 job_type,
817 CreateStreamingJobType::SnapshotBackfill(_)
818 ));
819 let table_fragments = &info.stream_job_fragments;
820 let mut table_ids: HashSet<_> =
821 table_fragments.internal_table_ids().into_iter().collect();
822 if let Some(mv_table_id) = table_fragments.mv_table_id() {
823 table_ids.insert(mv_table_id);
824 }
825
826 vec![NewTableFragmentInfo { table_ids }]
827 } else {
828 vec![]
829 };
830
831 let mut mv_log_store_truncate_epoch = HashMap::new();
832 let mut update_truncate_epoch =
834 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
835 Entry::Occupied(mut entry) => {
836 let prev_truncate_epoch = entry.get_mut();
837 if truncate_epoch < *prev_truncate_epoch {
838 *prev_truncate_epoch = truncate_epoch;
839 }
840 }
841 Entry::Vacant(entry) => {
842 entry.insert(truncate_epoch);
843 }
844 };
845 for (mv_table_id, max_retention) in database_info.max_subscription_retention() {
846 let truncate_epoch = barrier_info
847 .barrier_info
848 .get_truncate_epoch(max_retention)
849 .0;
850 update_truncate_epoch(mv_table_id, truncate_epoch);
851 }
852 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
853 for mv_table_id in upstream_mv_table_ids {
854 update_truncate_epoch(mv_table_id, backfill_epoch);
855 }
856 }
857
858 let table_new_change_log = build_table_change_log_delta(
859 old_value_ssts.into_iter(),
860 synced_ssts.iter().map(|sst| &sst.sst_info),
861 must_match!(&barrier_info.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
862 mv_log_store_truncate_epoch.into_iter(),
863 );
864
865 let epoch = barrier_info.barrier_info.prev_epoch();
866 for table_id in &barrier_info.table_ids_to_commit {
867 info.tables_to_commit
868 .try_insert(*table_id, epoch)
869 .expect("non duplicate");
870 }
871
872 info.sstables.extend(synced_ssts);
873 info.new_table_watermarks.extend(new_table_watermarks);
874 info.sst_to_context.extend(sst_to_context);
875 info.new_table_fragment_infos
876 .extend(new_table_fragment_infos);
877 info.change_log_delta.extend(table_new_change_log);
878 for (table_id, vector_index_adds) in vector_index_adds {
879 info.vector_index_delta
880 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
881 .expect("non-duplicate");
882 }
883 if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } =
884 &barrier_info.post_collect_command
885 && let Some(index_table) = collect_new_vector_index_info(job_info)
886 {
887 info.vector_index_delta
888 .try_insert(
889 index_table.id,
890 VectorIndexDelta::Init(PbVectorIndexInit {
891 info: Some(index_table.vector_index_info.unwrap()),
892 }),
893 )
894 .expect("non-duplicate");
895 }
896 info.truncate_tables.extend(truncate_tables);
897 }
898}
899
900impl Command {
901 pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
903 {
904 {
905 if !is_currently_paused {
908 Some(Mutation::Pause(PauseMutation {}))
909 } else {
910 None
911 }
912 }
913 }
914 }
915
916 pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
918 {
919 {
920 if is_currently_paused {
922 Some(Mutation::Resume(ResumeMutation {}))
923 } else {
924 None
925 }
926 }
927 }
928 }
929
930 pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
932 {
933 {
934 let mut diff = HashMap::new();
935
936 for actor_splits in split_assignment.values() {
937 diff.extend(actor_splits.clone());
938 }
939
940 Mutation::Splits(SourceChangeSplitMutation {
941 actor_splits: build_actor_connector_splits(&diff),
942 })
943 }
944 }
945 }
946
947 pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
949 {
950 {
951 let config = config.clone();
952 Mutation::Throttle(ThrottleMutation {
953 fragment_throttle: config,
954 })
955 }
956 }
957 }
958
959 pub(super) fn drop_streaming_jobs_to_mutation(
961 actors: &Vec<ActorId>,
962 dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
963 ) -> Mutation {
964 {
965 Mutation::Stop(StopMutation {
966 actors: actors.clone(),
967 dropped_sink_fragments: dropped_sink_fragment_by_targets
968 .values()
969 .flatten()
970 .cloned()
971 .collect(),
972 })
973 }
974 }
975
976 pub(super) fn create_streaming_job_to_mutation(
978 info: &CreateStreamingJobCommandInfo,
979 job_type: &CreateStreamingJobType,
980 is_currently_paused: bool,
981 edges: &mut FragmentEdgeBuildResult,
982 control_stream_manager: &ControlStreamManager,
983 actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
984 split_assignment: &SplitAssignment,
985 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
986 actor_location: &HashMap<ActorId, WorkerId>,
987 ) -> MetaResult<Mutation> {
988 {
989 {
990 let CreateStreamingJobCommandInfo {
991 stream_job_fragments,
992 upstream_fragment_downstreams,
993 fragment_backfill_ordering,
994 streaming_job,
995 ..
996 } = info;
997 let database_id = streaming_job.database_id();
998 let added_actors: Vec<ActorId> = stream_actors
999 .values()
1000 .flatten()
1001 .map(|actor| actor.actor_id)
1002 .collect();
1003 let actor_splits = split_assignment
1004 .values()
1005 .flat_map(build_actor_connector_splits)
1006 .collect();
1007 let subscriptions_to_add =
1008 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1009 job_type
1010 {
1011 snapshot_backfill_info
1012 .upstream_mv_table_id_to_backfill_epoch
1013 .keys()
1014 .map(|table_id| SubscriptionUpstreamInfo {
1015 subscriber_id: stream_job_fragments
1016 .stream_job_id()
1017 .as_subscriber_id(),
1018 upstream_mv_table_id: *table_id,
1019 })
1020 .collect()
1021 } else {
1022 Default::default()
1023 };
1024 let backfill_nodes_to_pause: Vec<_> =
1025 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1026 .into_iter()
1027 .collect();
1028
1029 let new_upstream_sinks =
1030 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1031 sink_fragment_id,
1032 sink_output_fields,
1033 project_exprs,
1034 new_sink_downstream,
1035 ..
1036 }) = job_type
1037 {
1038 let new_sink_actors = stream_actors
1039 .get(sink_fragment_id)
1040 .unwrap_or_else(|| {
1041 panic!("upstream sink fragment {sink_fragment_id} not exist")
1042 })
1043 .iter()
1044 .map(|actor| {
1045 let worker_id = actor_location[&actor.actor_id];
1046 PbActorInfo {
1047 actor_id: actor.actor_id,
1048 host: Some(control_stream_manager.host_addr(worker_id)),
1049 partial_graph_id: to_partial_graph_id(database_id, None),
1050 }
1051 });
1052 let new_upstream_sink = PbNewUpstreamSink {
1053 info: Some(PbUpstreamSinkInfo {
1054 upstream_fragment_id: *sink_fragment_id,
1055 sink_output_schema: sink_output_fields.clone(),
1056 project_exprs: project_exprs.clone(),
1057 }),
1058 upstream_actors: new_sink_actors.collect(),
1059 };
1060 HashMap::from([(
1061 new_sink_downstream.downstream_fragment_id,
1062 new_upstream_sink,
1063 )])
1064 } else {
1065 HashMap::new()
1066 };
1067
1068 let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1069 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1070
1071 let add_mutation = AddMutation {
1072 actor_dispatchers: edges
1073 .dispatchers
1074 .extract_if(|fragment_id, _| {
1075 upstream_fragment_downstreams.contains_key(fragment_id)
1076 })
1077 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1078 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1079 .collect(),
1080 added_actors,
1081 actor_splits,
1082 pause: is_currently_paused,
1084 subscriptions_to_add,
1085 backfill_nodes_to_pause,
1086 actor_cdc_table_snapshot_splits,
1087 new_upstream_sinks,
1088 };
1089
1090 Ok(Mutation::Add(add_mutation))
1091 }
1092 }
1093 }
1094
1095 pub(super) fn replace_stream_job_to_mutation(
1097 ReplaceStreamJobPlan {
1098 old_fragments,
1099 replace_upstream,
1100 upstream_fragment_downstreams,
1101 auto_refresh_schema_sinks,
1102 ..
1103 }: &ReplaceStreamJobPlan,
1104 edges: &mut FragmentEdgeBuildResult,
1105 database_info: &mut InflightDatabaseInfo,
1106 split_assignment: &SplitAssignment,
1107 ) -> MetaResult<Option<Mutation>> {
1108 {
1109 {
1110 let merge_updates = edges
1111 .merge_updates
1112 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1113 .collect();
1114 let dispatchers = edges
1115 .dispatchers
1116 .extract_if(|fragment_id, _| {
1117 upstream_fragment_downstreams.contains_key(fragment_id)
1118 })
1119 .collect();
1120 let actor_cdc_table_snapshot_splits = database_info
1121 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1122 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1123 let old_fragments = old_fragments.fragments.keys().copied();
1124 let auto_refresh_sink_fragment_ids = auto_refresh_schema_sinks
1125 .as_ref()
1126 .into_iter()
1127 .flat_map(|sinks| sinks.iter())
1128 .map(|sink| sink.original_fragment.fragment_id);
1129 Ok(Self::generate_update_mutation_for_replace_table(
1130 old_fragments
1131 .chain(auto_refresh_sink_fragment_ids)
1132 .flat_map(|fragment_id| {
1133 database_info.fragment(fragment_id).actors.keys().copied()
1134 }),
1135 merge_updates,
1136 dispatchers,
1137 split_assignment,
1138 actor_cdc_table_snapshot_splits,
1139 auto_refresh_schema_sinks.as_ref(),
1140 ))
1141 }
1142 }
1143 }
1144
1145 pub(super) fn reschedule_to_mutation(
1147 reschedules: &HashMap<FragmentId, Reschedule>,
1148 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1149 control_stream_manager: &ControlStreamManager,
1150 database_info: &mut InflightDatabaseInfo,
1151 ) -> MetaResult<Option<Mutation>> {
1152 {
1153 {
1154 let database_id = database_info.database_id;
1155 let mut dispatcher_update = HashMap::new();
1156 for reschedule in reschedules.values() {
1157 for &(upstream_fragment_id, dispatcher_id) in
1158 &reschedule.upstream_fragment_dispatcher_ids
1159 {
1160 let upstream_actor_ids = fragment_actors
1162 .get(&upstream_fragment_id)
1163 .expect("should contain");
1164
1165 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1166
1167 for &actor_id in upstream_actor_ids {
1169 let added_downstream_actor_id = if upstream_reschedule
1170 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1171 .unwrap_or(true)
1172 {
1173 reschedule
1174 .added_actors
1175 .values()
1176 .flatten()
1177 .cloned()
1178 .collect()
1179 } else {
1180 Default::default()
1181 };
1182 dispatcher_update
1184 .try_insert(
1185 (actor_id, dispatcher_id),
1186 DispatcherUpdate {
1187 actor_id,
1188 dispatcher_id,
1189 hash_mapping: reschedule
1190 .upstream_dispatcher_mapping
1191 .as_ref()
1192 .map(|m| m.to_protobuf()),
1193 added_downstream_actor_id,
1194 removed_downstream_actor_id: reschedule
1195 .removed_actors
1196 .iter()
1197 .cloned()
1198 .collect(),
1199 },
1200 )
1201 .unwrap();
1202 }
1203 }
1204 }
1205 let dispatcher_update = dispatcher_update.into_values().collect();
1206
1207 let mut merge_update = HashMap::new();
1208 for (&fragment_id, reschedule) in reschedules {
1209 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1210 let downstream_actor_ids = fragment_actors
1212 .get(&downstream_fragment_id)
1213 .expect("should contain");
1214
1215 let downstream_removed_actors: HashSet<_> = reschedules
1219 .get(&downstream_fragment_id)
1220 .map(|downstream_reschedule| {
1221 downstream_reschedule
1222 .removed_actors
1223 .iter()
1224 .copied()
1225 .collect()
1226 })
1227 .unwrap_or_default();
1228
1229 for &actor_id in downstream_actor_ids {
1231 if downstream_removed_actors.contains(&actor_id) {
1232 continue;
1233 }
1234
1235 merge_update
1237 .try_insert(
1238 (actor_id, fragment_id),
1239 MergeUpdate {
1240 actor_id,
1241 upstream_fragment_id: fragment_id,
1242 new_upstream_fragment_id: None,
1243 added_upstream_actors: reschedule
1244 .added_actors
1245 .iter()
1246 .flat_map(|(worker_id, actors)| {
1247 let host =
1248 control_stream_manager.host_addr(*worker_id);
1249 actors.iter().map(move |&actor_id| PbActorInfo {
1250 actor_id,
1251 host: Some(host.clone()),
1252 partial_graph_id: to_partial_graph_id(
1254 database_id,
1255 None,
1256 ),
1257 })
1258 })
1259 .collect(),
1260 removed_upstream_actor_id: reschedule
1261 .removed_actors
1262 .iter()
1263 .cloned()
1264 .collect(),
1265 },
1266 )
1267 .unwrap();
1268 }
1269 }
1270 }
1271 let merge_update = merge_update.into_values().collect();
1272
1273 let mut actor_vnode_bitmap_update = HashMap::new();
1274 for reschedule in reschedules.values() {
1275 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1277 let bitmap = bitmap.to_protobuf();
1278 actor_vnode_bitmap_update
1279 .try_insert(actor_id, bitmap)
1280 .unwrap();
1281 }
1282 }
1283 let dropped_actors = reschedules
1284 .values()
1285 .flat_map(|r| r.removed_actors.iter().copied())
1286 .collect();
1287 let mut actor_splits = HashMap::new();
1288 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1289 for (fragment_id, reschedule) in reschedules {
1290 for (actor_id, splits) in &reschedule.actor_splits {
1291 actor_splits.insert(
1292 *actor_id,
1293 ConnectorSplits {
1294 splits: splits.iter().map(ConnectorSplit::from).collect(),
1295 },
1296 );
1297 }
1298
1299 if let Some(assignment) =
1300 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1301 {
1302 actor_cdc_table_snapshot_splits.extend(assignment)
1303 }
1304 }
1305
1306 let actor_new_dispatchers = HashMap::new();
1308 let mutation = Mutation::Update(UpdateMutation {
1309 dispatcher_update,
1310 merge_update,
1311 actor_vnode_bitmap_update,
1312 dropped_actors,
1313 actor_splits,
1314 actor_new_dispatchers,
1315 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1316 splits: actor_cdc_table_snapshot_splits,
1317 }),
1318 sink_schema_change: Default::default(),
1319 subscriptions_to_drop: vec![],
1320 });
1321 tracing::debug!("update mutation: {mutation:?}");
1322 Ok(Some(mutation))
1323 }
1324 }
1325 }
1326
1327 pub(super) fn create_subscription_to_mutation(
1329 upstream_mv_table_id: TableId,
1330 subscription_id: SubscriptionId,
1331 ) -> Mutation {
1332 {
1333 Mutation::Add(AddMutation {
1334 actor_dispatchers: Default::default(),
1335 added_actors: vec![],
1336 actor_splits: Default::default(),
1337 pause: false,
1338 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1339 upstream_mv_table_id,
1340 subscriber_id: subscription_id.as_subscriber_id(),
1341 }],
1342 backfill_nodes_to_pause: vec![],
1343 actor_cdc_table_snapshot_splits: None,
1344 new_upstream_sinks: Default::default(),
1345 })
1346 }
1347 }
1348
1349 pub(super) fn drop_subscription_to_mutation(
1351 upstream_mv_table_id: TableId,
1352 subscription_id: SubscriptionId,
1353 ) -> Mutation {
1354 {
1355 Mutation::DropSubscriptions(DropSubscriptionsMutation {
1356 info: vec![SubscriptionUpstreamInfo {
1357 subscriber_id: subscription_id.as_subscriber_id(),
1358 upstream_mv_table_id,
1359 }],
1360 })
1361 }
1362 }
1363
1364 pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1366 {
1367 {
1368 let mut connector_props_infos = HashMap::default();
1369 for (k, v) in config {
1370 connector_props_infos.insert(
1371 k.as_raw_id(),
1372 ConnectorPropsInfo {
1373 connector_props_info: v.clone(),
1374 },
1375 );
1376 }
1377 Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1378 connector_props_infos,
1379 })
1380 }
1381 }
1382 }
1383
1384 pub(super) fn refresh_to_mutation(
1386 table_id: TableId,
1387 associated_source_id: SourceId,
1388 ) -> Mutation {
1389 Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1390 table_id,
1391 associated_source_id,
1392 })
1393 }
1394
1395 pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1397 Mutation::ListFinish(ListFinishMutation {
1398 associated_source_id,
1399 })
1400 }
1401
1402 pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1404 Mutation::LoadFinish(LoadFinishMutation {
1405 associated_source_id,
1406 })
1407 }
1408
1409 pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1411 Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1412 source_id: source_id.as_raw_id(),
1413 })
1414 }
1415
1416 pub(super) fn resume_backfill_to_mutation(
1418 target: &ResumeBackfillTarget,
1419 database_info: &InflightDatabaseInfo,
1420 ) -> MetaResult<Option<Mutation>> {
1421 {
1422 {
1423 let fragment_ids: HashSet<_> = match target {
1424 ResumeBackfillTarget::Job(job_id) => {
1425 database_info.backfill_fragment_ids_for_job(*job_id)?
1426 }
1427 ResumeBackfillTarget::Fragment(fragment_id) => {
1428 if !database_info.is_backfill_fragment(*fragment_id)? {
1429 return Err(MetaError::invalid_parameter(format!(
1430 "fragment {} is not a backfill node",
1431 fragment_id
1432 )));
1433 }
1434 HashSet::from([*fragment_id])
1435 }
1436 };
1437 if fragment_ids.is_empty() {
1438 warn!(
1439 ?target,
1440 "resume backfill command ignored because no backfill fragments found"
1441 );
1442 Ok(None)
1443 } else {
1444 Ok(Some(Mutation::StartFragmentBackfill(
1445 StartFragmentBackfillMutation {
1446 fragment_ids: fragment_ids.into_iter().collect(),
1447 },
1448 )))
1449 }
1450 }
1451 }
1452 }
1453
1454 pub(super) fn inject_source_offsets_to_mutation(
1456 source_id: SourceId,
1457 split_offsets: &HashMap<String, String>,
1458 ) -> Mutation {
1459 Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1460 source_id: source_id.as_raw_id(),
1461 split_offsets: split_offsets.clone(),
1462 })
1463 }
1464
1465 pub(super) fn create_streaming_job_actors_to_create(
1467 info: &CreateStreamingJobCommandInfo,
1468 edges: &mut FragmentEdgeBuildResult,
1469 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1470 actor_location: &HashMap<ActorId, WorkerId>,
1471 ) -> StreamJobActorsToCreate {
1472 {
1473 {
1474 edges.collect_actors_to_create(info.stream_job_fragments.fragments.values().map(
1475 |fragment| {
1476 let actors = stream_actors
1477 .get(&fragment.fragment_id)
1478 .into_iter()
1479 .flatten()
1480 .map(|actor| (actor, actor_location[&actor.actor_id]));
1481 (
1482 fragment.fragment_id,
1483 &fragment.nodes,
1484 actors,
1485 [], )
1487 },
1488 ))
1489 }
1490 }
1491 }
1492
1493 pub(super) fn reschedule_actors_to_create(
1495 reschedules: &HashMap<FragmentId, Reschedule>,
1496 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1497 database_info: &InflightDatabaseInfo,
1498 control_stream_manager: &ControlStreamManager,
1499 ) -> StreamJobActorsToCreate {
1500 {
1501 {
1502 let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1503 reschedules.iter().map(|(fragment_id, reschedule)| {
1504 (
1505 *fragment_id,
1506 reschedule.newly_created_actors.values().map(
1507 |((actor, dispatchers), _)| {
1508 (actor.actor_id, dispatchers.as_slice())
1509 },
1510 ),
1511 )
1512 }),
1513 Some((reschedules, fragment_actors)),
1514 database_info,
1515 control_stream_manager,
1516 );
1517 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1518 for (fragment_id, (actor, dispatchers), worker_id) in
1519 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1520 reschedule
1521 .newly_created_actors
1522 .values()
1523 .map(|(actors, status)| (*fragment_id, actors, status))
1524 })
1525 {
1526 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1527 map.entry(*worker_id)
1528 .or_default()
1529 .entry(fragment_id)
1530 .or_insert_with(|| {
1531 let node = database_info.fragment(fragment_id).nodes.clone();
1532 let subscribers =
1533 database_info.fragment_subscribers(fragment_id).collect();
1534 (node, vec![], subscribers)
1535 })
1536 .1
1537 .push((actor.clone(), upstreams, dispatchers.clone()));
1538 }
1539 map
1540 }
1541 }
1542 }
1543
1544 pub(super) fn replace_stream_job_actors_to_create(
1546 replace_table: &ReplaceStreamJobPlan,
1547 edges: &mut FragmentEdgeBuildResult,
1548 database_info: &InflightDatabaseInfo,
1549 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1550 actor_location: &HashMap<ActorId, WorkerId>,
1551 ) -> StreamJobActorsToCreate {
1552 {
1553 {
1554 let mut actors = edges.collect_actors_to_create(
1555 replace_table
1556 .new_fragments
1557 .fragments
1558 .values()
1559 .map(|fragment| {
1560 let actors = stream_actors
1561 .get(&fragment.fragment_id)
1562 .into_iter()
1563 .flatten()
1564 .map(|actor| (actor, actor_location[&actor.actor_id]));
1565 (
1566 fragment.fragment_id,
1567 &fragment.nodes,
1568 actors,
1569 database_info
1570 .job_subscribers(replace_table.old_fragments.stream_job_id),
1571 )
1572 }),
1573 );
1574
1575 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1577 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1578 (
1579 sink.new_fragment.fragment_id,
1580 &sink.new_fragment.nodes,
1581 stream_actors
1582 .get(&sink.new_fragment.fragment_id)
1583 .into_iter()
1584 .flatten()
1585 .map(|actor| (actor, actor_location[&actor.actor_id])),
1586 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1587 )
1588 }));
1589 for (worker_id, fragment_actors) in sink_actors {
1590 actors.entry(worker_id).or_default().extend(fragment_actors);
1591 }
1592 }
1593 actors
1594 }
1595 }
1596 }
1597
1598 fn generate_update_mutation_for_replace_table(
1599 dropped_actors: impl IntoIterator<Item = ActorId>,
1600 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1601 dispatchers: FragmentActorDispatchers,
1602 split_assignment: &SplitAssignment,
1603 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1604 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1605 ) -> Option<Mutation> {
1606 let dropped_actors = dropped_actors.into_iter().collect();
1607
1608 let actor_new_dispatchers = dispatchers
1609 .into_values()
1610 .flatten()
1611 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1612 .collect();
1613
1614 let actor_splits = split_assignment
1615 .values()
1616 .flat_map(build_actor_connector_splits)
1617 .collect();
1618 Some(Mutation::Update(UpdateMutation {
1619 actor_new_dispatchers,
1620 merge_update: merge_updates.into_values().flatten().collect(),
1621 dropped_actors,
1622 actor_splits,
1623 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1624 sink_schema_change: auto_refresh_schema_sinks
1625 .as_ref()
1626 .into_iter()
1627 .flat_map(|sinks| {
1628 sinks.iter().map(|sink| {
1629 let op = if !sink.removed_column_names.is_empty() {
1630 PbSinkSchemaChangeOp::DropColumns(PbSinkDropColumnsOp {
1631 column_names: sink.removed_column_names.clone(),
1632 })
1633 } else {
1634 PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1635 fields: sink
1636 .newly_add_fields
1637 .iter()
1638 .map(|field| field.to_prost())
1639 .collect(),
1640 })
1641 };
1642 (
1643 sink.original_sink.id.as_raw_id(),
1644 PbSinkSchemaChange {
1645 original_schema: sink
1646 .original_sink
1647 .columns
1648 .iter()
1649 .map(|col| PbField {
1650 data_type: Some(
1651 col.column_desc
1652 .as_ref()
1653 .unwrap()
1654 .column_type
1655 .as_ref()
1656 .unwrap()
1657 .clone(),
1658 ),
1659 name: col.column_desc.as_ref().unwrap().name.clone(),
1660 })
1661 .collect(),
1662 op: Some(op),
1663 },
1664 )
1665 })
1666 })
1667 .collect(),
1668 ..Default::default()
1669 }))
1670 }
1671}
1672
1673impl Command {
1674 #[expect(clippy::type_complexity)]
1675 pub(super) fn collect_database_partial_graph_actor_upstreams(
1676 actor_dispatchers: impl Iterator<
1677 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1678 >,
1679 reschedule_dispatcher_update: Option<(
1680 &HashMap<FragmentId, Reschedule>,
1681 &HashMap<FragmentId, HashSet<ActorId>>,
1682 )>,
1683 database_info: &InflightDatabaseInfo,
1684 control_stream_manager: &ControlStreamManager,
1685 ) -> HashMap<ActorId, ActorUpstreams> {
1686 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1687 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1688 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1689 for (upstream_actor_id, dispatchers) in upstream_actors {
1690 let upstream_actor_location =
1691 upstream_fragment.actors[&upstream_actor_id].worker_id;
1692 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1693 for downstream_actor_id in dispatchers
1694 .iter()
1695 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1696 {
1697 actor_upstreams
1698 .entry(*downstream_actor_id)
1699 .or_default()
1700 .entry(upstream_fragment_id)
1701 .or_default()
1702 .insert(
1703 upstream_actor_id,
1704 PbActorInfo {
1705 actor_id: upstream_actor_id,
1706 host: Some(upstream_actor_host.clone()),
1707 partial_graph_id: to_partial_graph_id(
1708 database_info.database_id,
1709 None,
1710 ),
1711 },
1712 );
1713 }
1714 }
1715 }
1716 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1717 for reschedule in reschedules.values() {
1718 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1719 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1720 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1721 for upstream_actor_id in fragment_actors
1722 .get(upstream_fragment_id)
1723 .expect("should exist")
1724 {
1725 let upstream_actor_location =
1726 upstream_fragment.actors[upstream_actor_id].worker_id;
1727 let upstream_actor_host =
1728 control_stream_manager.host_addr(upstream_actor_location);
1729 if let Some(upstream_reschedule) = upstream_reschedule
1730 && upstream_reschedule
1731 .removed_actors
1732 .contains(upstream_actor_id)
1733 {
1734 continue;
1735 }
1736 for (_, downstream_actor_id) in
1737 reschedule
1738 .added_actors
1739 .iter()
1740 .flat_map(|(worker_id, actors)| {
1741 actors.iter().map(|actor| (*worker_id, *actor))
1742 })
1743 {
1744 actor_upstreams
1745 .entry(downstream_actor_id)
1746 .or_default()
1747 .entry(*upstream_fragment_id)
1748 .or_default()
1749 .insert(
1750 *upstream_actor_id,
1751 PbActorInfo {
1752 actor_id: *upstream_actor_id,
1753 host: Some(upstream_actor_host.clone()),
1754 partial_graph_id: to_partial_graph_id(
1755 database_info.database_id,
1756 None,
1757 ),
1758 },
1759 );
1760 }
1761 }
1762 }
1763 }
1764 }
1765 actor_upstreams
1766 }
1767}