1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation, streaming_job};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37 PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkDropColumnsOp,
49 PbSinkSchemaChange, PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation,
50 StartFragmentBackfillMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation,
51 UpdateMutation,
52};
53use risingwave_pb::stream_service::BarrierCompleteResponse;
54use tracing::warn;
55
56use super::info::InflightDatabaseInfo;
57use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
58use crate::barrier::edge_builder::FragmentEdgeBuildResult;
59use crate::barrier::info::BarrierInfo;
60use crate::barrier::partial_graph::PartialGraphBarrierInfo;
61use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
62use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
63use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
64use crate::controller::scale::LoadedFragmentContext;
65use crate::controller::utils::StreamingJobExtraInfo;
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70 FragmentId, FragmentReplaceUpstream, StreamActor, StreamActorWithDispatchers,
71 StreamJobActorsToCreate, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::{
74 AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
75 ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
76 build_actor_connector_splits,
77};
78use crate::{MetaError, MetaResult};
79
80#[derive(Debug, Clone)]
83pub struct Reschedule {
84 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
86
87 pub removed_actors: HashSet<ActorId>,
89
90 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
92
93 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
95 pub upstream_dispatcher_mapping: Option<ActorMapping>,
100
101 pub downstream_fragment_ids: Vec<FragmentId>,
103
104 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
108
109 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
110}
111
112#[derive(Debug, Clone)]
113pub struct ReschedulePlan {
114 pub reschedules: HashMap<FragmentId, Reschedule>,
115 pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
118}
119
120#[derive(Debug, Clone)]
122pub struct RescheduleContext {
123 pub loaded: LoadedFragmentContext,
124 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
125 pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
126 pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
127 pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
128}
129
130impl RescheduleContext {
131 pub fn empty() -> Self {
132 Self {
133 loaded: LoadedFragmentContext::default(),
134 job_extra_info: HashMap::new(),
135 upstream_fragments: HashMap::new(),
136 downstream_fragments: HashMap::new(),
137 downstream_relations: HashMap::new(),
138 }
139 }
140
141 pub fn is_empty(&self) -> bool {
142 self.loaded.is_empty()
143 }
144
145 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
146 let loaded = self.loaded.for_database(database_id)?;
147 let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
148 let fragment_ids: HashSet<FragmentId> = loaded
151 .job_fragments
152 .values()
153 .flat_map(|fragments| fragments.keys().copied())
154 .collect();
155
156 let job_extra_info = self
157 .job_extra_info
158 .iter()
159 .filter(|(job_id, _)| job_ids.contains(*job_id))
160 .map(|(job_id, info)| (*job_id, info.clone()))
161 .collect();
162
163 let upstream_fragments = self
164 .upstream_fragments
165 .iter()
166 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
167 .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
168 .collect();
169
170 let downstream_fragments = self
171 .downstream_fragments
172 .iter()
173 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
174 .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
175 .collect();
176
177 let downstream_relations = self
178 .downstream_relations
179 .iter()
180 .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
184 .map(|(key, relation)| (*key, relation.clone()))
185 .collect();
186
187 Some(Self {
188 loaded,
189 job_extra_info,
190 upstream_fragments,
191 downstream_fragments,
192 downstream_relations,
193 })
194 }
195
196 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
199 let Self {
200 loaded,
201 job_extra_info,
202 upstream_fragments,
203 downstream_fragments,
204 downstream_relations,
205 } = self;
206
207 let mut contexts: HashMap<_, _> = loaded
208 .into_database_contexts()
209 .into_iter()
210 .map(|(database_id, loaded)| {
211 (
212 database_id,
213 Self {
214 loaded,
215 job_extra_info: HashMap::new(),
216 upstream_fragments: HashMap::new(),
217 downstream_fragments: HashMap::new(),
218 downstream_relations: HashMap::new(),
219 },
220 )
221 })
222 .collect();
223
224 if contexts.is_empty() {
225 return contexts;
226 }
227
228 let mut job_databases = HashMap::new();
229 let mut fragment_databases = HashMap::new();
230 for (&database_id, context) in &contexts {
231 for job_id in context.loaded.job_map.keys().copied() {
232 job_databases.insert(job_id, database_id);
233 }
234 for fragment_id in context
235 .loaded
236 .job_fragments
237 .values()
238 .flat_map(|fragments| fragments.keys().copied())
239 {
240 fragment_databases.insert(fragment_id, database_id);
241 }
242 }
243
244 for (job_id, info) in job_extra_info {
245 if let Some(database_id) = job_databases.get(&job_id).copied() {
246 contexts
247 .get_mut(&database_id)
248 .expect("database context should exist for job")
249 .job_extra_info
250 .insert(job_id, info);
251 }
252 }
253
254 for (fragment_id, upstreams) in upstream_fragments {
255 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
256 contexts
257 .get_mut(&database_id)
258 .expect("database context should exist for fragment")
259 .upstream_fragments
260 .insert(fragment_id, upstreams);
261 }
262 }
263
264 for (fragment_id, downstreams) in downstream_fragments {
265 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
266 contexts
267 .get_mut(&database_id)
268 .expect("database context should exist for fragment")
269 .downstream_fragments
270 .insert(fragment_id, downstreams);
271 }
272 }
273
274 for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
275 if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
278 contexts
279 .get_mut(&database_id)
280 .expect("database context should exist for relation source")
281 .downstream_relations
282 .insert((source_fragment_id, target_fragment_id), relation);
283 }
284 }
285
286 contexts
287 }
288}
289
290#[derive(Debug, Clone)]
297pub struct ReplaceStreamJobPlan {
298 pub old_fragments: StreamJobFragments,
299 pub new_fragments: StreamJobFragmentsToCreate,
300 pub database_resource_group: String,
302 pub replace_upstream: FragmentReplaceUpstream,
305 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
306 pub split_plan: ReplaceJobSplitPlan,
309 pub streaming_job: StreamingJob,
311 pub streaming_job_model: streaming_job::Model,
313 pub tmp_id: JobId,
315 pub to_drop_state_table_ids: Vec<TableId>,
317 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
318}
319
320impl ReplaceStreamJobPlan {
321 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
323 let mut fragment_replacements = HashMap::new();
324 for (upstream_fragment_id, new_upstream_fragment_id) in
325 self.replace_upstream.values().flatten()
326 {
327 {
328 let r =
329 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
330 if let Some(r) = r {
331 assert_eq!(
332 *new_upstream_fragment_id, r,
333 "one fragment is replaced by multiple fragments"
334 );
335 }
336 }
337 }
338 fragment_replacements
339 }
340}
341
342#[derive(educe::Educe, Clone)]
343#[educe(Debug)]
344pub struct CreateStreamingJobCommandInfo {
345 #[educe(Debug(ignore))]
346 pub stream_job_fragments: StreamJobFragmentsToCreate,
347 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
348 pub database_resource_group: String,
350 pub init_split_assignment: SourceSplitAssignment,
352 pub definition: String,
353 pub job_type: StreamingJobType,
354 pub create_type: CreateType,
355 pub streaming_job: StreamingJob,
356 pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
357 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
358 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
359 pub is_serverless: bool,
360 pub streaming_job_model: streaming_job::Model,
362 pub refresh_interval_sec: Option<u64>,
364}
365
366impl StreamJobFragments {
367 pub(super) fn new_fragment_info<'a>(
370 &'a self,
371 stream_actors: &'a HashMap<FragmentId, Vec<StreamActor>>,
372 actor_location: &'a HashMap<ActorId, WorkerId>,
373 assignment: &'a SplitAssignment,
374 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
375 self.fragments.values().map(|fragment| {
376 (
377 fragment.fragment_id,
378 InflightFragmentInfo {
379 fragment_id: fragment.fragment_id,
380 distribution_type: fragment.distribution_type.into(),
381 fragment_type_mask: fragment.fragment_type_mask,
382 vnode_count: fragment.vnode_count(),
383 nodes: fragment.nodes.clone(),
384 actors: stream_actors
385 .get(&fragment.fragment_id)
386 .into_iter()
387 .flatten()
388 .map(|actor| {
389 (
390 actor.actor_id,
391 InflightActorInfo {
392 worker_id: actor_location[&actor.actor_id],
393 vnode_bitmap: actor.vnode_bitmap.clone(),
394 splits: assignment
395 .get(&fragment.fragment_id)
396 .and_then(|s| s.get(&actor.actor_id))
397 .cloned()
398 .unwrap_or_default(),
399 },
400 )
401 })
402 .collect(),
403 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
404 },
405 )
406 })
407 }
408}
409
410#[derive(Debug, Clone)]
411pub struct SnapshotBackfillInfo {
412 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
416}
417
418#[derive(Debug, Clone)]
419pub struct BatchRefreshInfo {
420 pub snapshot_backfill_info: SnapshotBackfillInfo,
421 pub refresh_interval_sec: u64,
422}
423
424#[derive(Debug, Clone)]
425pub enum CreateStreamingJobType {
426 Normal,
427 SinkIntoTable(UpstreamSinkInfo),
428 SnapshotBackfill(SnapshotBackfillInfo),
429 BatchRefresh(BatchRefreshInfo),
430}
431
432#[derive(Debug)]
437pub enum Command {
438 Flush,
441
442 Pause,
445
446 Resume,
450
451 DropStreamingJobs {
459 streaming_job_ids: HashSet<JobId>,
460 unregistered_state_table_ids: HashSet<TableId>,
462 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
464 },
465
466 CreateStreamingJob {
476 info: CreateStreamingJobCommandInfo,
477 job_type: CreateStreamingJobType,
478 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
479 },
480
481 RescheduleIntent {
483 context: RescheduleContext,
484 reschedule_plan: Option<ReschedulePlan>,
489 },
490
491 ReplaceStreamJob(ReplaceStreamJobPlan),
498
499 SourceChangeSplit(SplitState),
502
503 Throttle {
506 jobs: HashSet<JobId>,
507 config: HashMap<FragmentId, ThrottleConfig>,
508 },
509
510 CreateSubscription {
513 subscription_id: SubscriptionId,
514 upstream_mv_table_id: TableId,
515 retention_second: u64,
516 },
517
518 DropSubscription {
522 subscription_id: SubscriptionId,
523 upstream_mv_table_id: TableId,
524 },
525
526 AlterSubscriptionRetention {
528 subscription_id: SubscriptionId,
529 upstream_mv_table_id: TableId,
530 retention_second: u64,
531 },
532
533 ConnectorPropsChange(ConnectorPropsChange),
534
535 Refresh {
538 table_id: TableId,
539 associated_source_id: SourceId,
540 },
541 ListFinish {
542 table_id: TableId,
543 associated_source_id: SourceId,
544 },
545 LoadFinish {
546 table_id: TableId,
547 associated_source_id: SourceId,
548 },
549
550 ResetSource {
553 source_id: SourceId,
554 },
555
556 ResumeBackfill {
559 target: ResumeBackfillTarget,
560 },
561
562 InjectSourceOffsets {
566 source_id: SourceId,
567 split_offsets: HashMap<String, String>,
569 },
570}
571
572#[derive(Debug, Clone, Copy)]
573pub enum ResumeBackfillTarget {
574 Job(JobId),
575 Fragment(FragmentId),
576}
577
578impl std::fmt::Display for Command {
580 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
581 match self {
582 Command::Flush => write!(f, "Flush"),
583 Command::Pause => write!(f, "Pause"),
584 Command::Resume => write!(f, "Resume"),
585 Command::DropStreamingJobs {
586 streaming_job_ids, ..
587 } => {
588 write!(
589 f,
590 "DropStreamingJobs: {}",
591 streaming_job_ids.iter().sorted().join(", ")
592 )
593 }
594 Command::CreateStreamingJob { info, .. } => {
595 write!(f, "CreateStreamingJob: {}", info.streaming_job)
596 }
597 Command::RescheduleIntent {
598 reschedule_plan, ..
599 } => {
600 if reschedule_plan.is_some() {
601 write!(f, "RescheduleIntent(planned)")
602 } else {
603 write!(f, "RescheduleIntent")
604 }
605 }
606 Command::ReplaceStreamJob(plan) => {
607 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
608 }
609 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
610 Command::Throttle { .. } => write!(f, "Throttle"),
611 Command::CreateSubscription {
612 subscription_id, ..
613 } => write!(f, "CreateSubscription: {subscription_id}"),
614 Command::DropSubscription {
615 subscription_id, ..
616 } => write!(f, "DropSubscription: {subscription_id}"),
617 Command::AlterSubscriptionRetention {
618 subscription_id,
619 retention_second,
620 ..
621 } => write!(
622 f,
623 "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
624 ),
625 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
626 Command::Refresh {
627 table_id,
628 associated_source_id,
629 } => write!(
630 f,
631 "Refresh: {} (source: {})",
632 table_id, associated_source_id
633 ),
634 Command::ListFinish {
635 table_id,
636 associated_source_id,
637 } => write!(
638 f,
639 "ListFinish: {} (source: {})",
640 table_id, associated_source_id
641 ),
642 Command::LoadFinish {
643 table_id,
644 associated_source_id,
645 } => write!(
646 f,
647 "LoadFinish: {} (source: {})",
648 table_id, associated_source_id
649 ),
650 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
651 Command::ResumeBackfill { target } => match target {
652 ResumeBackfillTarget::Job(job_id) => {
653 write!(f, "ResumeBackfill: job={job_id}")
654 }
655 ResumeBackfillTarget::Fragment(fragment_id) => {
656 write!(f, "ResumeBackfill: fragment={fragment_id}")
657 }
658 },
659 Command::InjectSourceOffsets {
660 source_id,
661 split_offsets,
662 } => write!(
663 f,
664 "InjectSourceOffsets: {} ({} splits)",
665 source_id,
666 split_offsets.len()
667 ),
668 }
669 }
670}
671
672impl Command {
673 pub fn pause() -> Self {
674 Self::Pause
675 }
676
677 pub fn resume() -> Self {
678 Self::Resume
679 }
680
681 pub fn need_checkpoint(&self) -> bool {
682 !matches!(self, Command::Resume)
684 }
685}
686
687#[derive(Debug)]
688pub enum PostCollectCommand {
689 Command(String),
690 DropStreamingJobs,
691 CreateStreamingJob {
692 info: CreateStreamingJobCommandInfo,
693 job_type: CreateStreamingJobType,
694 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
695 resolved_split_assignment: SplitAssignment,
696 },
697 Reschedule {
698 reschedules: HashMap<FragmentId, Reschedule>,
699 },
700 ReplaceStreamJob {
701 plan: ReplaceStreamJobPlan,
702 resolved_split_assignment: SplitAssignment,
703 },
704 SourceChangeSplit {
705 split_assignment: SplitAssignment,
706 },
707 CreateSubscription {
708 subscription_id: SubscriptionId,
709 },
710 ConnectorPropsChange(ConnectorPropsChange),
711 ResumeBackfill {
712 target: ResumeBackfillTarget,
713 },
714}
715
716impl PostCollectCommand {
717 pub fn barrier() -> Self {
718 PostCollectCommand::Command("barrier".to_owned())
719 }
720
721 pub fn should_checkpoint(&self) -> bool {
722 match self {
723 PostCollectCommand::DropStreamingJobs
724 | PostCollectCommand::CreateStreamingJob { .. }
725 | PostCollectCommand::Reschedule { .. }
726 | PostCollectCommand::ReplaceStreamJob { .. }
727 | PostCollectCommand::SourceChangeSplit { .. }
728 | PostCollectCommand::CreateSubscription { .. }
729 | PostCollectCommand::ConnectorPropsChange(_)
730 | PostCollectCommand::ResumeBackfill { .. } => true,
731 PostCollectCommand::Command(_) => false,
732 }
733 }
734
735 pub fn command_name(&self) -> &str {
736 match self {
737 PostCollectCommand::Command(name) => name.as_str(),
738 PostCollectCommand::DropStreamingJobs => "DropStreamingJobs",
739 PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
740 PostCollectCommand::Reschedule { .. } => "Reschedule",
741 PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
742 PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
743 PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
744 PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
745 PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
746 }
747 }
748}
749
750impl Display for PostCollectCommand {
751 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
752 f.write_str(self.command_name())
753 }
754}
755
756#[derive(Debug, Clone, PartialEq, Eq)]
757pub enum BarrierKind {
758 Initial,
759 Barrier,
760 Checkpoint(Vec<u64>),
762}
763
764impl BarrierKind {
765 pub fn to_protobuf(&self) -> PbBarrierKind {
766 match self {
767 BarrierKind::Initial => PbBarrierKind::Initial,
768 BarrierKind::Barrier => PbBarrierKind::Barrier,
769 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
770 }
771 }
772
773 pub fn is_checkpoint(&self) -> bool {
774 matches!(self, BarrierKind::Checkpoint(_))
775 }
776
777 pub fn is_initial(&self) -> bool {
778 matches!(self, BarrierKind::Initial)
779 }
780
781 pub fn as_str_name(&self) -> &'static str {
782 match self {
783 BarrierKind::Initial => "Initial",
784 BarrierKind::Barrier => "Barrier",
785 BarrierKind::Checkpoint(_) => "Checkpoint",
786 }
787 }
788}
789
790impl BarrierInfo {
791 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
792 let Some(truncate_timestamptz) = Timestamptz::from_secs(
793 self.prev_epoch.value().as_timestamptz().timestamp() - retention_second as i64,
794 ) else {
795 warn!(retention_second, prev_epoch = ?self.prev_epoch.value(), "invalid retention second value");
796 return self.prev_epoch.value();
797 };
798 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
799 }
800}
801
802impl Command {
803 pub(super) fn collect_commit_epoch_info(
804 database_info: &InflightDatabaseInfo,
805 barrier_info: &PartialGraphBarrierInfo,
806 info: &mut CommitEpochInfo,
807 resps: Vec<BarrierCompleteResponse>,
808 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
809 ) {
810 let (
811 sst_to_context,
812 synced_ssts,
813 new_table_watermarks,
814 old_value_ssts,
815 vector_index_adds,
816 truncate_tables,
817 ) = collect_resp_info(resps);
818
819 let new_table_fragment_infos =
820 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } =
821 &barrier_info.post_collect_command
822 {
823 assert!(!matches!(
824 job_type,
825 CreateStreamingJobType::SnapshotBackfill(_)
826 | CreateStreamingJobType::BatchRefresh(_)
827 ));
828 let table_fragments = &info.stream_job_fragments;
829 let mut table_ids: HashSet<_> =
830 table_fragments.internal_table_ids().into_iter().collect();
831 if let Some(mv_table_id) = table_fragments.mv_table_id() {
832 table_ids.insert(mv_table_id);
833 }
834
835 vec![NewTableFragmentInfo { table_ids }]
836 } else {
837 vec![]
838 };
839
840 let mut mv_log_store_truncate_epoch = HashMap::new();
841 let mut update_truncate_epoch =
843 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
844 Entry::Occupied(mut entry) => {
845 let prev_truncate_epoch = entry.get_mut();
846 if truncate_epoch < *prev_truncate_epoch {
847 *prev_truncate_epoch = truncate_epoch;
848 }
849 }
850 Entry::Vacant(entry) => {
851 entry.insert(truncate_epoch);
852 }
853 };
854 for (mv_table_id, max_retention) in database_info.max_subscription_retention() {
855 let truncate_epoch = barrier_info
856 .barrier_info
857 .get_truncate_epoch(max_retention)
858 .0;
859 update_truncate_epoch(mv_table_id, truncate_epoch);
860 }
861 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
862 for mv_table_id in upstream_mv_table_ids {
863 update_truncate_epoch(mv_table_id, backfill_epoch);
864 }
865 }
866
867 let table_new_change_log = build_table_change_log_delta(
868 old_value_ssts.into_iter(),
869 synced_ssts.iter().map(|sst| &sst.sst_info),
870 must_match!(&barrier_info.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
871 mv_log_store_truncate_epoch.into_iter(),
872 );
873
874 let epoch = barrier_info.barrier_info.prev_epoch();
875 for table_id in &barrier_info.table_ids_to_commit {
876 info.tables_to_commit
877 .try_insert(*table_id, epoch)
878 .expect("non duplicate");
879 }
880
881 info.sstables.extend(synced_ssts);
882 info.new_table_watermarks.extend(new_table_watermarks);
883 info.sst_to_context.extend(sst_to_context);
884 info.new_table_fragment_infos
885 .extend(new_table_fragment_infos);
886 info.change_log_delta.extend(table_new_change_log);
887 for (table_id, vector_index_adds) in vector_index_adds {
888 info.vector_index_delta
889 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
890 .expect("non-duplicate");
891 }
892 if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } =
893 &barrier_info.post_collect_command
894 && let Some(index_table) = collect_new_vector_index_info(job_info)
895 {
896 info.vector_index_delta
897 .try_insert(
898 index_table.id,
899 VectorIndexDelta::Init(PbVectorIndexInit {
900 info: Some(index_table.vector_index_info.unwrap()),
901 }),
902 )
903 .expect("non-duplicate");
904 }
905 info.truncate_tables.extend(truncate_tables);
906 }
907}
908
909impl Command {
910 pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
912 {
913 {
914 if !is_currently_paused {
917 Some(Mutation::Pause(PauseMutation {}))
918 } else {
919 None
920 }
921 }
922 }
923 }
924
925 pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
927 {
928 {
929 if is_currently_paused {
931 Some(Mutation::Resume(ResumeMutation {}))
932 } else {
933 None
934 }
935 }
936 }
937 }
938
939 pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
941 {
942 {
943 let mut diff = HashMap::new();
944
945 for actor_splits in split_assignment.values() {
946 diff.extend(actor_splits.clone());
947 }
948
949 Mutation::Splits(SourceChangeSplitMutation {
950 actor_splits: build_actor_connector_splits(&diff),
951 })
952 }
953 }
954 }
955
956 pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
958 {
959 {
960 let config = config.clone();
961 Mutation::Throttle(ThrottleMutation {
962 fragment_throttle: config,
963 })
964 }
965 }
966 }
967
968 pub(super) fn drop_streaming_jobs_to_mutation(
970 actors: &Vec<ActorId>,
971 dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
972 ) -> Mutation {
973 {
974 Mutation::Stop(StopMutation {
975 actors: actors.clone(),
976 dropped_sink_fragments: dropped_sink_fragment_by_targets
977 .values()
978 .flatten()
979 .cloned()
980 .collect(),
981 })
982 }
983 }
984
985 pub(super) fn create_streaming_job_to_mutation(
987 info: &CreateStreamingJobCommandInfo,
988 job_type: &CreateStreamingJobType,
989 is_currently_paused: bool,
990 edges: &mut FragmentEdgeBuildResult,
991 control_stream_manager: &ControlStreamManager,
992 actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
993 split_assignment: &SplitAssignment,
994 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
995 actor_location: &HashMap<ActorId, WorkerId>,
996 ) -> MetaResult<Mutation> {
997 {
998 {
999 let CreateStreamingJobCommandInfo {
1000 stream_job_fragments,
1001 upstream_fragment_downstreams,
1002 fragment_backfill_ordering,
1003 streaming_job,
1004 ..
1005 } = info;
1006 let database_id = streaming_job.database_id();
1007 let added_actors: Vec<ActorId> = stream_actors
1008 .values()
1009 .flatten()
1010 .map(|actor| actor.actor_id)
1011 .collect();
1012 let actor_splits = split_assignment
1013 .values()
1014 .flat_map(build_actor_connector_splits)
1015 .collect();
1016 let subscriptions_to_add =
1017 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
1018 | CreateStreamingJobType::BatchRefresh(BatchRefreshInfo {
1019 snapshot_backfill_info,
1020 ..
1021 }) = job_type
1022 {
1023 snapshot_backfill_info
1024 .upstream_mv_table_id_to_backfill_epoch
1025 .keys()
1026 .map(|table_id| SubscriptionUpstreamInfo {
1027 subscriber_id: stream_job_fragments
1028 .stream_job_id()
1029 .as_subscriber_id(),
1030 upstream_mv_table_id: *table_id,
1031 })
1032 .collect()
1033 } else {
1034 Default::default()
1035 };
1036 let backfill_nodes_to_pause: Vec<_> =
1037 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1038 .into_iter()
1039 .collect();
1040
1041 let new_upstream_sinks =
1042 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1043 sink_fragment_id,
1044 sink_output_fields,
1045 project_exprs,
1046 new_sink_downstream,
1047 ..
1048 }) = job_type
1049 {
1050 let new_sink_actors = stream_actors
1051 .get(sink_fragment_id)
1052 .unwrap_or_else(|| {
1053 panic!("upstream sink fragment {sink_fragment_id} not exist")
1054 })
1055 .iter()
1056 .map(|actor| {
1057 let worker_id = actor_location[&actor.actor_id];
1058 PbActorInfo {
1059 actor_id: actor.actor_id,
1060 host: Some(control_stream_manager.host_addr(worker_id)),
1061 partial_graph_id: to_partial_graph_id(database_id, None),
1062 }
1063 });
1064 let new_upstream_sink = PbNewUpstreamSink {
1065 info: Some(PbUpstreamSinkInfo {
1066 upstream_fragment_id: *sink_fragment_id,
1067 sink_output_schema: sink_output_fields.clone(),
1068 project_exprs: project_exprs.clone(),
1069 }),
1070 upstream_actors: new_sink_actors.collect(),
1071 };
1072 HashMap::from([(
1073 new_sink_downstream.downstream_fragment_id,
1074 new_upstream_sink,
1075 )])
1076 } else {
1077 HashMap::new()
1078 };
1079
1080 let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1081 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1082
1083 let add_mutation = AddMutation {
1084 actor_dispatchers: edges
1085 .dispatchers
1086 .extract_if(|fragment_id, _| {
1087 upstream_fragment_downstreams.contains_key(fragment_id)
1088 })
1089 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1090 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1091 .collect(),
1092 added_actors,
1093 actor_splits,
1094 pause: is_currently_paused,
1096 subscriptions_to_add,
1097 backfill_nodes_to_pause,
1098 actor_cdc_table_snapshot_splits,
1099 new_upstream_sinks,
1100 };
1101
1102 Ok(Mutation::Add(add_mutation))
1103 }
1104 }
1105 }
1106
1107 pub(super) fn replace_stream_job_to_mutation(
1109 ReplaceStreamJobPlan {
1110 old_fragments,
1111 replace_upstream,
1112 upstream_fragment_downstreams,
1113 auto_refresh_schema_sinks,
1114 ..
1115 }: &ReplaceStreamJobPlan,
1116 edges: &mut FragmentEdgeBuildResult,
1117 database_info: &mut InflightDatabaseInfo,
1118 split_assignment: &SplitAssignment,
1119 ) -> MetaResult<Option<Mutation>> {
1120 {
1121 {
1122 let merge_updates = edges
1123 .merge_updates
1124 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1125 .collect();
1126 let dispatchers = edges
1127 .dispatchers
1128 .extract_if(|fragment_id, _| {
1129 upstream_fragment_downstreams.contains_key(fragment_id)
1130 })
1131 .collect();
1132 let actor_cdc_table_snapshot_splits = database_info
1133 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1134 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1135 let old_fragments = old_fragments.fragments.keys().copied();
1136 let auto_refresh_sink_fragment_ids = auto_refresh_schema_sinks
1137 .as_ref()
1138 .into_iter()
1139 .flat_map(|sinks| sinks.iter())
1140 .map(|sink| sink.original_fragment.fragment_id);
1141 Ok(Self::generate_update_mutation_for_replace_table(
1142 old_fragments
1143 .chain(auto_refresh_sink_fragment_ids)
1144 .flat_map(|fragment_id| {
1145 database_info.fragment(fragment_id).actors.keys().copied()
1146 }),
1147 merge_updates,
1148 dispatchers,
1149 split_assignment,
1150 actor_cdc_table_snapshot_splits,
1151 auto_refresh_schema_sinks.as_ref(),
1152 ))
1153 }
1154 }
1155 }
1156
1157 pub(super) fn reschedule_to_mutation(
1159 reschedules: &HashMap<FragmentId, Reschedule>,
1160 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1161 control_stream_manager: &ControlStreamManager,
1162 database_info: &mut InflightDatabaseInfo,
1163 ) -> MetaResult<Option<Mutation>> {
1164 {
1165 {
1166 let database_id = database_info.database_id;
1167 let mut dispatcher_update = HashMap::new();
1168 for reschedule in reschedules.values() {
1169 for &(upstream_fragment_id, dispatcher_id) in
1170 &reschedule.upstream_fragment_dispatcher_ids
1171 {
1172 let upstream_actor_ids = fragment_actors
1174 .get(&upstream_fragment_id)
1175 .expect("should contain");
1176
1177 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1178
1179 for &actor_id in upstream_actor_ids {
1181 let added_downstream_actor_id = if upstream_reschedule
1182 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1183 .unwrap_or(true)
1184 {
1185 reschedule
1186 .added_actors
1187 .values()
1188 .flatten()
1189 .cloned()
1190 .collect()
1191 } else {
1192 Default::default()
1193 };
1194 dispatcher_update
1196 .try_insert(
1197 (actor_id, dispatcher_id),
1198 DispatcherUpdate {
1199 actor_id,
1200 dispatcher_id,
1201 hash_mapping: reschedule
1202 .upstream_dispatcher_mapping
1203 .as_ref()
1204 .map(|m| m.to_protobuf()),
1205 added_downstream_actor_id,
1206 removed_downstream_actor_id: reschedule
1207 .removed_actors
1208 .iter()
1209 .cloned()
1210 .collect(),
1211 },
1212 )
1213 .unwrap();
1214 }
1215 }
1216 }
1217 let dispatcher_update = dispatcher_update.into_values().collect();
1218
1219 let mut merge_update = HashMap::new();
1220 for (&fragment_id, reschedule) in reschedules {
1221 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1222 let downstream_actor_ids = fragment_actors
1224 .get(&downstream_fragment_id)
1225 .expect("should contain");
1226
1227 let downstream_removed_actors: HashSet<_> = reschedules
1231 .get(&downstream_fragment_id)
1232 .map(|downstream_reschedule| {
1233 downstream_reschedule
1234 .removed_actors
1235 .iter()
1236 .copied()
1237 .collect()
1238 })
1239 .unwrap_or_default();
1240
1241 for &actor_id in downstream_actor_ids {
1243 if downstream_removed_actors.contains(&actor_id) {
1244 continue;
1245 }
1246
1247 merge_update
1249 .try_insert(
1250 (actor_id, fragment_id),
1251 MergeUpdate {
1252 actor_id,
1253 upstream_fragment_id: fragment_id,
1254 new_upstream_fragment_id: None,
1255 added_upstream_actors: reschedule
1256 .added_actors
1257 .iter()
1258 .flat_map(|(worker_id, actors)| {
1259 let host =
1260 control_stream_manager.host_addr(*worker_id);
1261 actors.iter().map(move |&actor_id| PbActorInfo {
1262 actor_id,
1263 host: Some(host.clone()),
1264 partial_graph_id: to_partial_graph_id(
1266 database_id,
1267 None,
1268 ),
1269 })
1270 })
1271 .collect(),
1272 removed_upstream_actor_id: reschedule
1273 .removed_actors
1274 .iter()
1275 .cloned()
1276 .collect(),
1277 },
1278 )
1279 .unwrap();
1280 }
1281 }
1282 }
1283 let merge_update = merge_update.into_values().collect();
1284
1285 let mut actor_vnode_bitmap_update = HashMap::new();
1286 for reschedule in reschedules.values() {
1287 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1289 let bitmap = bitmap.to_protobuf();
1290 actor_vnode_bitmap_update
1291 .try_insert(actor_id, bitmap)
1292 .unwrap();
1293 }
1294 }
1295 let dropped_actors = reschedules
1296 .values()
1297 .flat_map(|r| r.removed_actors.iter().copied())
1298 .collect();
1299 let mut actor_splits = HashMap::new();
1300 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1301 for (fragment_id, reschedule) in reschedules {
1302 for (actor_id, splits) in &reschedule.actor_splits {
1303 actor_splits.insert(
1304 *actor_id,
1305 ConnectorSplits {
1306 splits: splits.iter().map(ConnectorSplit::from).collect(),
1307 },
1308 );
1309 }
1310
1311 if let Some(assignment) =
1312 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1313 {
1314 actor_cdc_table_snapshot_splits.extend(assignment)
1315 }
1316 }
1317
1318 let actor_new_dispatchers = HashMap::new();
1320 let mutation = Mutation::Update(UpdateMutation {
1321 dispatcher_update,
1322 merge_update,
1323 actor_vnode_bitmap_update,
1324 dropped_actors,
1325 actor_splits,
1326 actor_new_dispatchers,
1327 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1328 splits: actor_cdc_table_snapshot_splits,
1329 }),
1330 sink_schema_change: Default::default(),
1331 subscriptions_to_drop: vec![],
1332 });
1333 tracing::debug!("update mutation: {mutation:?}");
1334 Ok(Some(mutation))
1335 }
1336 }
1337 }
1338
1339 pub(super) fn create_subscription_to_mutation(
1341 upstream_mv_table_id: TableId,
1342 subscription_id: SubscriptionId,
1343 ) -> Mutation {
1344 {
1345 Mutation::Add(AddMutation {
1346 actor_dispatchers: Default::default(),
1347 added_actors: vec![],
1348 actor_splits: Default::default(),
1349 pause: false,
1350 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1351 upstream_mv_table_id,
1352 subscriber_id: subscription_id.as_subscriber_id(),
1353 }],
1354 backfill_nodes_to_pause: vec![],
1355 actor_cdc_table_snapshot_splits: None,
1356 new_upstream_sinks: Default::default(),
1357 })
1358 }
1359 }
1360
1361 pub(super) fn drop_subscription_to_mutation(
1363 upstream_mv_table_id: TableId,
1364 subscription_id: SubscriptionId,
1365 ) -> Mutation {
1366 {
1367 Mutation::DropSubscriptions(DropSubscriptionsMutation {
1368 info: vec![SubscriptionUpstreamInfo {
1369 subscriber_id: subscription_id.as_subscriber_id(),
1370 upstream_mv_table_id,
1371 }],
1372 })
1373 }
1374 }
1375
1376 pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1378 {
1379 {
1380 let mut connector_props_infos = HashMap::default();
1381 for (k, v) in config {
1382 connector_props_infos.insert(
1383 k.as_raw_id(),
1384 ConnectorPropsInfo {
1385 connector_props_info: v.clone(),
1386 },
1387 );
1388 }
1389 Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1390 connector_props_infos,
1391 })
1392 }
1393 }
1394 }
1395
1396 pub(super) fn refresh_to_mutation(
1398 table_id: TableId,
1399 associated_source_id: SourceId,
1400 ) -> Mutation {
1401 Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1402 table_id,
1403 associated_source_id,
1404 })
1405 }
1406
1407 pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1409 Mutation::ListFinish(ListFinishMutation {
1410 associated_source_id,
1411 })
1412 }
1413
1414 pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1416 Mutation::LoadFinish(LoadFinishMutation {
1417 associated_source_id,
1418 })
1419 }
1420
1421 pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1423 Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1424 source_id: source_id.as_raw_id(),
1425 })
1426 }
1427
1428 pub(super) fn resume_backfill_to_mutation(
1430 target: &ResumeBackfillTarget,
1431 database_info: &InflightDatabaseInfo,
1432 ) -> MetaResult<Option<Mutation>> {
1433 {
1434 {
1435 let fragment_ids: HashSet<_> = match target {
1436 ResumeBackfillTarget::Job(job_id) => {
1437 database_info.backfill_fragment_ids_for_job(*job_id)?
1438 }
1439 ResumeBackfillTarget::Fragment(fragment_id) => {
1440 if !database_info.is_backfill_fragment(*fragment_id)? {
1441 return Err(MetaError::invalid_parameter(format!(
1442 "fragment {} is not a backfill node",
1443 fragment_id
1444 )));
1445 }
1446 HashSet::from([*fragment_id])
1447 }
1448 };
1449 if fragment_ids.is_empty() {
1450 warn!(
1451 ?target,
1452 "resume backfill command ignored because no backfill fragments found"
1453 );
1454 Ok(None)
1455 } else {
1456 Ok(Some(Mutation::StartFragmentBackfill(
1457 StartFragmentBackfillMutation {
1458 fragment_ids: fragment_ids.into_iter().collect(),
1459 },
1460 )))
1461 }
1462 }
1463 }
1464 }
1465
1466 pub(super) fn inject_source_offsets_to_mutation(
1468 source_id: SourceId,
1469 split_offsets: &HashMap<String, String>,
1470 ) -> Mutation {
1471 Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1472 source_id: source_id.as_raw_id(),
1473 split_offsets: split_offsets.clone(),
1474 })
1475 }
1476
1477 pub(super) fn create_streaming_job_actors_to_create(
1479 info: &CreateStreamingJobCommandInfo,
1480 edges: &mut FragmentEdgeBuildResult,
1481 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1482 actor_location: &HashMap<ActorId, WorkerId>,
1483 ) -> StreamJobActorsToCreate {
1484 {
1485 {
1486 edges.collect_actors_to_create(info.stream_job_fragments.fragments.values().map(
1487 |fragment| {
1488 let actors = stream_actors
1489 .get(&fragment.fragment_id)
1490 .into_iter()
1491 .flatten()
1492 .map(|actor| (actor, actor_location[&actor.actor_id]));
1493 (
1494 fragment.fragment_id,
1495 &fragment.nodes,
1496 actors,
1497 [], )
1499 },
1500 ))
1501 }
1502 }
1503 }
1504
1505 pub(super) fn reschedule_actors_to_create(
1507 reschedules: &HashMap<FragmentId, Reschedule>,
1508 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1509 database_info: &InflightDatabaseInfo,
1510 control_stream_manager: &ControlStreamManager,
1511 ) -> StreamJobActorsToCreate {
1512 {
1513 {
1514 let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1515 reschedules.iter().map(|(fragment_id, reschedule)| {
1516 (
1517 *fragment_id,
1518 reschedule.newly_created_actors.values().map(
1519 |((actor, dispatchers), _)| {
1520 (actor.actor_id, dispatchers.as_slice())
1521 },
1522 ),
1523 )
1524 }),
1525 Some((reschedules, fragment_actors)),
1526 database_info,
1527 control_stream_manager,
1528 );
1529 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1530 for (fragment_id, (actor, dispatchers), worker_id) in
1531 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1532 reschedule
1533 .newly_created_actors
1534 .values()
1535 .map(|(actors, status)| (*fragment_id, actors, status))
1536 })
1537 {
1538 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1539 map.entry(*worker_id)
1540 .or_default()
1541 .entry(fragment_id)
1542 .or_insert_with(|| {
1543 let node = database_info.fragment(fragment_id).nodes.clone();
1544 let subscribers =
1545 database_info.fragment_subscribers(fragment_id).collect();
1546 (node, vec![], subscribers)
1547 })
1548 .1
1549 .push((actor.clone(), upstreams, dispatchers.clone()));
1550 }
1551 map
1552 }
1553 }
1554 }
1555
1556 pub(super) fn replace_stream_job_actors_to_create(
1558 replace_table: &ReplaceStreamJobPlan,
1559 edges: &mut FragmentEdgeBuildResult,
1560 database_info: &InflightDatabaseInfo,
1561 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1562 actor_location: &HashMap<ActorId, WorkerId>,
1563 ) -> StreamJobActorsToCreate {
1564 {
1565 {
1566 let mut actors = edges.collect_actors_to_create(
1567 replace_table
1568 .new_fragments
1569 .fragments
1570 .values()
1571 .map(|fragment| {
1572 let actors = stream_actors
1573 .get(&fragment.fragment_id)
1574 .into_iter()
1575 .flatten()
1576 .map(|actor| (actor, actor_location[&actor.actor_id]));
1577 (
1578 fragment.fragment_id,
1579 &fragment.nodes,
1580 actors,
1581 database_info
1582 .job_subscribers(replace_table.old_fragments.stream_job_id),
1583 )
1584 }),
1585 );
1586
1587 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1589 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1590 (
1591 sink.new_fragment.fragment_id,
1592 &sink.new_fragment.nodes,
1593 stream_actors
1594 .get(&sink.new_fragment.fragment_id)
1595 .into_iter()
1596 .flatten()
1597 .map(|actor| (actor, actor_location[&actor.actor_id])),
1598 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1599 )
1600 }));
1601 for (worker_id, fragment_actors) in sink_actors {
1602 actors.entry(worker_id).or_default().extend(fragment_actors);
1603 }
1604 }
1605 actors
1606 }
1607 }
1608 }
1609
1610 fn generate_update_mutation_for_replace_table(
1611 dropped_actors: impl IntoIterator<Item = ActorId>,
1612 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1613 dispatchers: FragmentActorDispatchers,
1614 split_assignment: &SplitAssignment,
1615 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1616 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1617 ) -> Option<Mutation> {
1618 let dropped_actors = dropped_actors.into_iter().collect();
1619
1620 let actor_new_dispatchers = dispatchers
1621 .into_values()
1622 .flatten()
1623 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1624 .collect();
1625
1626 let actor_splits = split_assignment
1627 .values()
1628 .flat_map(build_actor_connector_splits)
1629 .collect();
1630 Some(Mutation::Update(UpdateMutation {
1631 actor_new_dispatchers,
1632 merge_update: merge_updates.into_values().flatten().collect(),
1633 dropped_actors,
1634 actor_splits,
1635 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1636 sink_schema_change: auto_refresh_schema_sinks
1637 .as_ref()
1638 .into_iter()
1639 .flat_map(|sinks| {
1640 sinks.iter().map(|sink| {
1641 let op = if !sink.removed_column_names.is_empty() {
1642 PbSinkSchemaChangeOp::DropColumns(PbSinkDropColumnsOp {
1643 column_names: sink.removed_column_names.clone(),
1644 })
1645 } else {
1646 PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1647 fields: sink
1648 .newly_add_fields
1649 .iter()
1650 .map(|field| field.to_prost())
1651 .collect(),
1652 })
1653 };
1654 (
1655 sink.original_sink.id.as_raw_id(),
1656 PbSinkSchemaChange {
1657 original_schema: sink
1658 .original_sink
1659 .columns
1660 .iter()
1661 .map(|col| PbField {
1662 data_type: Some(
1663 col.column_desc
1664 .as_ref()
1665 .unwrap()
1666 .column_type
1667 .as_ref()
1668 .unwrap()
1669 .clone(),
1670 ),
1671 name: col.column_desc.as_ref().unwrap().name.clone(),
1672 })
1673 .collect(),
1674 op: Some(op),
1675 },
1676 )
1677 })
1678 })
1679 .collect(),
1680 ..Default::default()
1681 }))
1682 }
1683}
1684
1685impl Command {
1686 #[expect(clippy::type_complexity)]
1687 pub(super) fn collect_database_partial_graph_actor_upstreams(
1688 actor_dispatchers: impl Iterator<
1689 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1690 >,
1691 reschedule_dispatcher_update: Option<(
1692 &HashMap<FragmentId, Reschedule>,
1693 &HashMap<FragmentId, HashSet<ActorId>>,
1694 )>,
1695 database_info: &InflightDatabaseInfo,
1696 control_stream_manager: &ControlStreamManager,
1697 ) -> HashMap<ActorId, ActorUpstreams> {
1698 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1699 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1700 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1701 for (upstream_actor_id, dispatchers) in upstream_actors {
1702 let upstream_actor_location =
1703 upstream_fragment.actors[&upstream_actor_id].worker_id;
1704 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1705 for downstream_actor_id in dispatchers
1706 .iter()
1707 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1708 {
1709 actor_upstreams
1710 .entry(*downstream_actor_id)
1711 .or_default()
1712 .entry(upstream_fragment_id)
1713 .or_default()
1714 .insert(
1715 upstream_actor_id,
1716 PbActorInfo {
1717 actor_id: upstream_actor_id,
1718 host: Some(upstream_actor_host.clone()),
1719 partial_graph_id: to_partial_graph_id(
1720 database_info.database_id,
1721 None,
1722 ),
1723 },
1724 );
1725 }
1726 }
1727 }
1728 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1729 for reschedule in reschedules.values() {
1730 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1731 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1732 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1733 for upstream_actor_id in fragment_actors
1734 .get(upstream_fragment_id)
1735 .expect("should exist")
1736 {
1737 let upstream_actor_location =
1738 upstream_fragment.actors[upstream_actor_id].worker_id;
1739 let upstream_actor_host =
1740 control_stream_manager.host_addr(upstream_actor_location);
1741 if let Some(upstream_reschedule) = upstream_reschedule
1742 && upstream_reschedule
1743 .removed_actors
1744 .contains(upstream_actor_id)
1745 {
1746 continue;
1747 }
1748 for (_, downstream_actor_id) in
1749 reschedule
1750 .added_actors
1751 .iter()
1752 .flat_map(|(worker_id, actors)| {
1753 actors.iter().map(|actor| (*worker_id, *actor))
1754 })
1755 {
1756 actor_upstreams
1757 .entry(downstream_actor_id)
1758 .or_default()
1759 .entry(*upstream_fragment_id)
1760 .or_default()
1761 .insert(
1762 *upstream_actor_id,
1763 PbActorInfo {
1764 actor_id: *upstream_actor_id,
1765 host: Some(upstream_actor_host.clone()),
1766 partial_graph_id: to_partial_graph_id(
1767 database_info.database_id,
1768 None,
1769 ),
1770 },
1771 );
1772 }
1773 }
1774 }
1775 }
1776 }
1777 actor_upstreams
1778 }
1779}