1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation, streaming_job};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37 PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkDropColumnsOp,
49 PbSinkSchemaChange, PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation,
50 StartFragmentBackfillMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation,
51 UpdateMutation,
52};
53use risingwave_pb::stream_service::BarrierCompleteResponse;
54use tracing::warn;
55
56use super::info::InflightDatabaseInfo;
57use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
58use crate::barrier::edge_builder::FragmentEdgeBuildResult;
59use crate::barrier::info::BarrierInfo;
60use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
61use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
62use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
63use crate::controller::scale::LoadedFragmentContext;
64use crate::controller::utils::StreamingJobExtraInfo;
65use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
66use crate::manager::{StreamingJob, StreamingJobType};
67use crate::model::{
68 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
69 FragmentId, FragmentReplaceUpstream, StreamActor, StreamActorWithDispatchers,
70 StreamJobActorsToCreate, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
71};
72use crate::stream::{
73 AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
74 ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
75 build_actor_connector_splits,
76};
77use crate::{MetaError, MetaResult};
78
79#[derive(Debug, Clone)]
82pub struct Reschedule {
83 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
85
86 pub removed_actors: HashSet<ActorId>,
88
89 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
91
92 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
94 pub upstream_dispatcher_mapping: Option<ActorMapping>,
99
100 pub downstream_fragment_ids: Vec<FragmentId>,
102
103 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
107
108 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
109}
110
111#[derive(Debug, Clone)]
112pub struct ReschedulePlan {
113 pub reschedules: HashMap<FragmentId, Reschedule>,
114 pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
117}
118
119#[derive(Debug, Clone)]
121pub struct RescheduleContext {
122 pub loaded: LoadedFragmentContext,
123 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
124 pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
125 pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
126 pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
127}
128
129impl RescheduleContext {
130 pub fn empty() -> Self {
131 Self {
132 loaded: LoadedFragmentContext::default(),
133 job_extra_info: HashMap::new(),
134 upstream_fragments: HashMap::new(),
135 downstream_fragments: HashMap::new(),
136 downstream_relations: HashMap::new(),
137 }
138 }
139
140 pub fn is_empty(&self) -> bool {
141 self.loaded.is_empty()
142 }
143
144 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
145 let loaded = self.loaded.for_database(database_id)?;
146 let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
147 let fragment_ids: HashSet<FragmentId> = loaded
150 .job_fragments
151 .values()
152 .flat_map(|fragments| fragments.keys().copied())
153 .collect();
154
155 let job_extra_info = self
156 .job_extra_info
157 .iter()
158 .filter(|(job_id, _)| job_ids.contains(*job_id))
159 .map(|(job_id, info)| (*job_id, info.clone()))
160 .collect();
161
162 let upstream_fragments = self
163 .upstream_fragments
164 .iter()
165 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
166 .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
167 .collect();
168
169 let downstream_fragments = self
170 .downstream_fragments
171 .iter()
172 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
173 .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
174 .collect();
175
176 let downstream_relations = self
177 .downstream_relations
178 .iter()
179 .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
183 .map(|(key, relation)| (*key, relation.clone()))
184 .collect();
185
186 Some(Self {
187 loaded,
188 job_extra_info,
189 upstream_fragments,
190 downstream_fragments,
191 downstream_relations,
192 })
193 }
194
195 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
198 let Self {
199 loaded,
200 job_extra_info,
201 upstream_fragments,
202 downstream_fragments,
203 downstream_relations,
204 } = self;
205
206 let mut contexts: HashMap<_, _> = loaded
207 .into_database_contexts()
208 .into_iter()
209 .map(|(database_id, loaded)| {
210 (
211 database_id,
212 Self {
213 loaded,
214 job_extra_info: HashMap::new(),
215 upstream_fragments: HashMap::new(),
216 downstream_fragments: HashMap::new(),
217 downstream_relations: HashMap::new(),
218 },
219 )
220 })
221 .collect();
222
223 if contexts.is_empty() {
224 return contexts;
225 }
226
227 let mut job_databases = HashMap::new();
228 let mut fragment_databases = HashMap::new();
229 for (&database_id, context) in &contexts {
230 for job_id in context.loaded.job_map.keys().copied() {
231 job_databases.insert(job_id, database_id);
232 }
233 for fragment_id in context
234 .loaded
235 .job_fragments
236 .values()
237 .flat_map(|fragments| fragments.keys().copied())
238 {
239 fragment_databases.insert(fragment_id, database_id);
240 }
241 }
242
243 for (job_id, info) in job_extra_info {
244 if let Some(database_id) = job_databases.get(&job_id).copied() {
245 contexts
246 .get_mut(&database_id)
247 .expect("database context should exist for job")
248 .job_extra_info
249 .insert(job_id, info);
250 }
251 }
252
253 for (fragment_id, upstreams) in upstream_fragments {
254 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
255 contexts
256 .get_mut(&database_id)
257 .expect("database context should exist for fragment")
258 .upstream_fragments
259 .insert(fragment_id, upstreams);
260 }
261 }
262
263 for (fragment_id, downstreams) in downstream_fragments {
264 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
265 contexts
266 .get_mut(&database_id)
267 .expect("database context should exist for fragment")
268 .downstream_fragments
269 .insert(fragment_id, downstreams);
270 }
271 }
272
273 for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
274 if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
277 contexts
278 .get_mut(&database_id)
279 .expect("database context should exist for relation source")
280 .downstream_relations
281 .insert((source_fragment_id, target_fragment_id), relation);
282 }
283 }
284
285 contexts
286 }
287}
288
289#[derive(Debug, Clone)]
296pub struct ReplaceStreamJobPlan {
297 pub old_fragments: StreamJobFragments,
298 pub new_fragments: StreamJobFragmentsToCreate,
299 pub database_resource_group: String,
301 pub replace_upstream: FragmentReplaceUpstream,
304 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
305 pub split_plan: ReplaceJobSplitPlan,
308 pub streaming_job: StreamingJob,
310 pub streaming_job_model: streaming_job::Model,
312 pub tmp_id: JobId,
314 pub to_drop_state_table_ids: Vec<TableId>,
316 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
317}
318
319impl ReplaceStreamJobPlan {
320 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
322 let mut fragment_replacements = HashMap::new();
323 for (upstream_fragment_id, new_upstream_fragment_id) in
324 self.replace_upstream.values().flatten()
325 {
326 {
327 let r =
328 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
329 if let Some(r) = r {
330 assert_eq!(
331 *new_upstream_fragment_id, r,
332 "one fragment is replaced by multiple fragments"
333 );
334 }
335 }
336 }
337 fragment_replacements
338 }
339}
340
341#[derive(educe::Educe, Clone)]
342#[educe(Debug)]
343pub struct CreateStreamingJobCommandInfo {
344 #[educe(Debug(ignore))]
345 pub stream_job_fragments: StreamJobFragmentsToCreate,
346 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
347 pub database_resource_group: String,
349 pub init_split_assignment: SourceSplitAssignment,
351 pub definition: String,
352 pub job_type: StreamingJobType,
353 pub create_type: CreateType,
354 pub streaming_job: StreamingJob,
355 pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
356 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
357 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
358 pub is_serverless: bool,
359 pub streaming_job_model: streaming_job::Model,
361}
362
363impl StreamJobFragments {
364 pub(super) fn new_fragment_info<'a>(
367 &'a self,
368 stream_actors: &'a HashMap<FragmentId, Vec<StreamActor>>,
369 actor_location: &'a HashMap<ActorId, WorkerId>,
370 assignment: &'a SplitAssignment,
371 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
372 self.fragments.values().map(|fragment| {
373 (
374 fragment.fragment_id,
375 InflightFragmentInfo {
376 fragment_id: fragment.fragment_id,
377 distribution_type: fragment.distribution_type.into(),
378 fragment_type_mask: fragment.fragment_type_mask,
379 vnode_count: fragment.vnode_count(),
380 nodes: fragment.nodes.clone(),
381 actors: stream_actors
382 .get(&fragment.fragment_id)
383 .into_iter()
384 .flatten()
385 .map(|actor| {
386 (
387 actor.actor_id,
388 InflightActorInfo {
389 worker_id: actor_location[&actor.actor_id],
390 vnode_bitmap: actor.vnode_bitmap.clone(),
391 splits: assignment
392 .get(&fragment.fragment_id)
393 .and_then(|s| s.get(&actor.actor_id))
394 .cloned()
395 .unwrap_or_default(),
396 },
397 )
398 })
399 .collect(),
400 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
401 },
402 )
403 })
404 }
405}
406
407#[derive(Debug, Clone)]
408pub struct SnapshotBackfillInfo {
409 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
413}
414
415#[derive(Debug, Clone)]
416pub enum CreateStreamingJobType {
417 Normal,
418 SinkIntoTable(UpstreamSinkInfo),
419 SnapshotBackfill(SnapshotBackfillInfo),
420}
421
422#[derive(Debug)]
427pub enum Command {
428 Flush,
431
432 Pause,
435
436 Resume,
440
441 DropStreamingJobs {
449 streaming_job_ids: HashSet<JobId>,
450 unregistered_state_table_ids: HashSet<TableId>,
451 unregistered_fragment_ids: HashSet<FragmentId>,
452 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
454 },
455
456 CreateStreamingJob {
466 info: CreateStreamingJobCommandInfo,
467 job_type: CreateStreamingJobType,
468 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
469 },
470
471 RescheduleIntent {
473 context: RescheduleContext,
474 reschedule_plan: Option<ReschedulePlan>,
479 },
480
481 ReplaceStreamJob(ReplaceStreamJobPlan),
488
489 SourceChangeSplit(SplitState),
492
493 Throttle {
496 jobs: HashSet<JobId>,
497 config: HashMap<FragmentId, ThrottleConfig>,
498 },
499
500 CreateSubscription {
503 subscription_id: SubscriptionId,
504 upstream_mv_table_id: TableId,
505 retention_second: u64,
506 },
507
508 DropSubscription {
512 subscription_id: SubscriptionId,
513 upstream_mv_table_id: TableId,
514 },
515
516 AlterSubscriptionRetention {
518 subscription_id: SubscriptionId,
519 upstream_mv_table_id: TableId,
520 retention_second: u64,
521 },
522
523 ConnectorPropsChange(ConnectorPropsChange),
524
525 Refresh {
528 table_id: TableId,
529 associated_source_id: SourceId,
530 },
531 ListFinish {
532 table_id: TableId,
533 associated_source_id: SourceId,
534 },
535 LoadFinish {
536 table_id: TableId,
537 associated_source_id: SourceId,
538 },
539
540 ResetSource {
543 source_id: SourceId,
544 },
545
546 ResumeBackfill {
549 target: ResumeBackfillTarget,
550 },
551
552 InjectSourceOffsets {
556 source_id: SourceId,
557 split_offsets: HashMap<String, String>,
559 },
560}
561
562#[derive(Debug, Clone, Copy)]
563pub enum ResumeBackfillTarget {
564 Job(JobId),
565 Fragment(FragmentId),
566}
567
568impl std::fmt::Display for Command {
570 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
571 match self {
572 Command::Flush => write!(f, "Flush"),
573 Command::Pause => write!(f, "Pause"),
574 Command::Resume => write!(f, "Resume"),
575 Command::DropStreamingJobs {
576 streaming_job_ids, ..
577 } => {
578 write!(
579 f,
580 "DropStreamingJobs: {}",
581 streaming_job_ids.iter().sorted().join(", ")
582 )
583 }
584 Command::CreateStreamingJob { info, .. } => {
585 write!(f, "CreateStreamingJob: {}", info.streaming_job)
586 }
587 Command::RescheduleIntent {
588 reschedule_plan, ..
589 } => {
590 if reschedule_plan.is_some() {
591 write!(f, "RescheduleIntent(planned)")
592 } else {
593 write!(f, "RescheduleIntent")
594 }
595 }
596 Command::ReplaceStreamJob(plan) => {
597 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
598 }
599 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
600 Command::Throttle { .. } => write!(f, "Throttle"),
601 Command::CreateSubscription {
602 subscription_id, ..
603 } => write!(f, "CreateSubscription: {subscription_id}"),
604 Command::DropSubscription {
605 subscription_id, ..
606 } => write!(f, "DropSubscription: {subscription_id}"),
607 Command::AlterSubscriptionRetention {
608 subscription_id,
609 retention_second,
610 ..
611 } => write!(
612 f,
613 "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
614 ),
615 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
616 Command::Refresh {
617 table_id,
618 associated_source_id,
619 } => write!(
620 f,
621 "Refresh: {} (source: {})",
622 table_id, associated_source_id
623 ),
624 Command::ListFinish {
625 table_id,
626 associated_source_id,
627 } => write!(
628 f,
629 "ListFinish: {} (source: {})",
630 table_id, associated_source_id
631 ),
632 Command::LoadFinish {
633 table_id,
634 associated_source_id,
635 } => write!(
636 f,
637 "LoadFinish: {} (source: {})",
638 table_id, associated_source_id
639 ),
640 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
641 Command::ResumeBackfill { target } => match target {
642 ResumeBackfillTarget::Job(job_id) => {
643 write!(f, "ResumeBackfill: job={job_id}")
644 }
645 ResumeBackfillTarget::Fragment(fragment_id) => {
646 write!(f, "ResumeBackfill: fragment={fragment_id}")
647 }
648 },
649 Command::InjectSourceOffsets {
650 source_id,
651 split_offsets,
652 } => write!(
653 f,
654 "InjectSourceOffsets: {} ({} splits)",
655 source_id,
656 split_offsets.len()
657 ),
658 }
659 }
660}
661
662impl Command {
663 pub fn pause() -> Self {
664 Self::Pause
665 }
666
667 pub fn resume() -> Self {
668 Self::Resume
669 }
670
671 pub fn need_checkpoint(&self) -> bool {
672 !matches!(self, Command::Resume)
674 }
675}
676
677#[derive(Debug)]
678pub enum PostCollectCommand {
679 Command(String),
680 DropStreamingJobs {
681 streaming_job_ids: HashSet<JobId>,
682 unregistered_state_table_ids: HashSet<TableId>,
683 },
684 CreateStreamingJob {
685 info: CreateStreamingJobCommandInfo,
686 job_type: CreateStreamingJobType,
687 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
688 resolved_split_assignment: SplitAssignment,
689 },
690 Reschedule {
691 reschedules: HashMap<FragmentId, Reschedule>,
692 },
693 ReplaceStreamJob {
694 plan: ReplaceStreamJobPlan,
695 resolved_split_assignment: SplitAssignment,
696 },
697 SourceChangeSplit {
698 split_assignment: SplitAssignment,
699 },
700 CreateSubscription {
701 subscription_id: SubscriptionId,
702 },
703 ConnectorPropsChange(ConnectorPropsChange),
704 ResumeBackfill {
705 target: ResumeBackfillTarget,
706 },
707}
708
709impl PostCollectCommand {
710 pub fn barrier() -> Self {
711 PostCollectCommand::Command("barrier".to_owned())
712 }
713
714 pub fn command_name(&self) -> &str {
715 match self {
716 PostCollectCommand::Command(name) => name.as_str(),
717 PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
718 PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
719 PostCollectCommand::Reschedule { .. } => "Reschedule",
720 PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
721 PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
722 PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
723 PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
724 PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
725 }
726 }
727}
728
729impl Display for PostCollectCommand {
730 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
731 f.write_str(self.command_name())
732 }
733}
734
735#[derive(Debug, Clone)]
736pub enum BarrierKind {
737 Initial,
738 Barrier,
739 Checkpoint(Vec<u64>),
741}
742
743impl BarrierKind {
744 pub fn to_protobuf(&self) -> PbBarrierKind {
745 match self {
746 BarrierKind::Initial => PbBarrierKind::Initial,
747 BarrierKind::Barrier => PbBarrierKind::Barrier,
748 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
749 }
750 }
751
752 pub fn is_checkpoint(&self) -> bool {
753 matches!(self, BarrierKind::Checkpoint(_))
754 }
755
756 pub fn is_initial(&self) -> bool {
757 matches!(self, BarrierKind::Initial)
758 }
759
760 pub fn as_str_name(&self) -> &'static str {
761 match self {
762 BarrierKind::Initial => "Initial",
763 BarrierKind::Barrier => "Barrier",
764 BarrierKind::Checkpoint(_) => "Checkpoint",
765 }
766 }
767}
768
769pub(super) struct CommandContext {
772 mv_subscription_max_retention: HashMap<TableId, u64>,
773
774 pub(super) barrier_info: BarrierInfo,
775
776 pub(super) table_ids_to_commit: HashSet<TableId>,
777
778 pub(super) command: PostCollectCommand,
779
780 _span: tracing::Span,
786}
787
788impl std::fmt::Debug for CommandContext {
789 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
790 f.debug_struct("CommandContext")
791 .field("barrier_info", &self.barrier_info)
792 .field("command", &self.command.command_name())
793 .finish()
794 }
795}
796
797impl CommandContext {
798 pub(super) fn new(
799 barrier_info: BarrierInfo,
800 mv_subscription_max_retention: HashMap<TableId, u64>,
801 table_ids_to_commit: HashSet<TableId>,
802 command: PostCollectCommand,
803 span: tracing::Span,
804 ) -> Self {
805 Self {
806 mv_subscription_max_retention,
807 barrier_info,
808 table_ids_to_commit,
809 command,
810 _span: span,
811 }
812 }
813
814 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
815 let Some(truncate_timestamptz) = Timestamptz::from_secs(
816 self.barrier_info
817 .prev_epoch
818 .value()
819 .as_timestamptz()
820 .timestamp()
821 - retention_second as i64,
822 ) else {
823 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
824 return self.barrier_info.prev_epoch.value();
825 };
826 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
827 }
828
829 pub(super) fn collect_commit_epoch_info(
830 &self,
831 info: &mut CommitEpochInfo,
832 resps: Vec<BarrierCompleteResponse>,
833 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
834 ) {
835 let (
836 sst_to_context,
837 synced_ssts,
838 new_table_watermarks,
839 old_value_ssts,
840 vector_index_adds,
841 truncate_tables,
842 ) = collect_resp_info(resps);
843
844 let new_table_fragment_infos =
845 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
846 assert!(!matches!(
847 job_type,
848 CreateStreamingJobType::SnapshotBackfill(_)
849 ));
850 let table_fragments = &info.stream_job_fragments;
851 let mut table_ids: HashSet<_> =
852 table_fragments.internal_table_ids().into_iter().collect();
853 if let Some(mv_table_id) = table_fragments.mv_table_id() {
854 table_ids.insert(mv_table_id);
855 }
856
857 vec![NewTableFragmentInfo { table_ids }]
858 } else {
859 vec![]
860 };
861
862 let mut mv_log_store_truncate_epoch = HashMap::new();
863 let mut update_truncate_epoch =
865 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
866 Entry::Occupied(mut entry) => {
867 let prev_truncate_epoch = entry.get_mut();
868 if truncate_epoch < *prev_truncate_epoch {
869 *prev_truncate_epoch = truncate_epoch;
870 }
871 }
872 Entry::Vacant(entry) => {
873 entry.insert(truncate_epoch);
874 }
875 };
876 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
877 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
878 update_truncate_epoch(*mv_table_id, truncate_epoch);
879 }
880 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
881 for mv_table_id in upstream_mv_table_ids {
882 update_truncate_epoch(mv_table_id, backfill_epoch);
883 }
884 }
885
886 let table_new_change_log = build_table_change_log_delta(
887 old_value_ssts.into_iter(),
888 synced_ssts.iter().map(|sst| &sst.sst_info),
889 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
890 mv_log_store_truncate_epoch.into_iter(),
891 );
892
893 let epoch = self.barrier_info.prev_epoch();
894 for table_id in &self.table_ids_to_commit {
895 info.tables_to_commit
896 .try_insert(*table_id, epoch)
897 .expect("non duplicate");
898 }
899
900 info.sstables.extend(synced_ssts);
901 info.new_table_watermarks.extend(new_table_watermarks);
902 info.sst_to_context.extend(sst_to_context);
903 info.new_table_fragment_infos
904 .extend(new_table_fragment_infos);
905 info.change_log_delta.extend(table_new_change_log);
906 for (table_id, vector_index_adds) in vector_index_adds {
907 info.vector_index_delta
908 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
909 .expect("non-duplicate");
910 }
911 if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
912 && let Some(index_table) = collect_new_vector_index_info(job_info)
913 {
914 info.vector_index_delta
915 .try_insert(
916 index_table.id,
917 VectorIndexDelta::Init(PbVectorIndexInit {
918 info: Some(index_table.vector_index_info.unwrap()),
919 }),
920 )
921 .expect("non-duplicate");
922 }
923 info.truncate_tables.extend(truncate_tables);
924 }
925}
926
927impl Command {
928 pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
930 {
931 {
932 if !is_currently_paused {
935 Some(Mutation::Pause(PauseMutation {}))
936 } else {
937 None
938 }
939 }
940 }
941 }
942
943 pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
945 {
946 {
947 if is_currently_paused {
949 Some(Mutation::Resume(ResumeMutation {}))
950 } else {
951 None
952 }
953 }
954 }
955 }
956
957 pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
959 {
960 {
961 let mut diff = HashMap::new();
962
963 for actor_splits in split_assignment.values() {
964 diff.extend(actor_splits.clone());
965 }
966
967 Mutation::Splits(SourceChangeSplitMutation {
968 actor_splits: build_actor_connector_splits(&diff),
969 })
970 }
971 }
972 }
973
974 pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
976 {
977 {
978 let config = config.clone();
979 Mutation::Throttle(ThrottleMutation {
980 fragment_throttle: config,
981 })
982 }
983 }
984 }
985
986 pub(super) fn drop_streaming_jobs_to_mutation(
988 actors: &Vec<ActorId>,
989 dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
990 ) -> Mutation {
991 {
992 Mutation::Stop(StopMutation {
993 actors: actors.clone(),
994 dropped_sink_fragments: dropped_sink_fragment_by_targets
995 .values()
996 .flatten()
997 .cloned()
998 .collect(),
999 })
1000 }
1001 }
1002
1003 pub(super) fn create_streaming_job_to_mutation(
1005 info: &CreateStreamingJobCommandInfo,
1006 job_type: &CreateStreamingJobType,
1007 is_currently_paused: bool,
1008 edges: &mut FragmentEdgeBuildResult,
1009 control_stream_manager: &ControlStreamManager,
1010 actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
1011 split_assignment: &SplitAssignment,
1012 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1013 actor_location: &HashMap<ActorId, WorkerId>,
1014 ) -> MetaResult<Mutation> {
1015 {
1016 {
1017 let CreateStreamingJobCommandInfo {
1018 stream_job_fragments,
1019 upstream_fragment_downstreams,
1020 fragment_backfill_ordering,
1021 streaming_job,
1022 ..
1023 } = info;
1024 let database_id = streaming_job.database_id();
1025 let added_actors: Vec<ActorId> = stream_actors
1026 .values()
1027 .flatten()
1028 .map(|actor| actor.actor_id)
1029 .collect();
1030 let actor_splits = split_assignment
1031 .values()
1032 .flat_map(build_actor_connector_splits)
1033 .collect();
1034 let subscriptions_to_add =
1035 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1036 job_type
1037 {
1038 snapshot_backfill_info
1039 .upstream_mv_table_id_to_backfill_epoch
1040 .keys()
1041 .map(|table_id| SubscriptionUpstreamInfo {
1042 subscriber_id: stream_job_fragments
1043 .stream_job_id()
1044 .as_subscriber_id(),
1045 upstream_mv_table_id: *table_id,
1046 })
1047 .collect()
1048 } else {
1049 Default::default()
1050 };
1051 let backfill_nodes_to_pause: Vec<_> =
1052 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1053 .into_iter()
1054 .collect();
1055
1056 let new_upstream_sinks =
1057 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1058 sink_fragment_id,
1059 sink_output_fields,
1060 project_exprs,
1061 new_sink_downstream,
1062 ..
1063 }) = job_type
1064 {
1065 let new_sink_actors = stream_actors
1066 .get(sink_fragment_id)
1067 .unwrap_or_else(|| {
1068 panic!("upstream sink fragment {sink_fragment_id} not exist")
1069 })
1070 .iter()
1071 .map(|actor| {
1072 let worker_id = actor_location[&actor.actor_id];
1073 PbActorInfo {
1074 actor_id: actor.actor_id,
1075 host: Some(control_stream_manager.host_addr(worker_id)),
1076 partial_graph_id: to_partial_graph_id(database_id, None),
1077 }
1078 });
1079 let new_upstream_sink = PbNewUpstreamSink {
1080 info: Some(PbUpstreamSinkInfo {
1081 upstream_fragment_id: *sink_fragment_id,
1082 sink_output_schema: sink_output_fields.clone(),
1083 project_exprs: project_exprs.clone(),
1084 }),
1085 upstream_actors: new_sink_actors.collect(),
1086 };
1087 HashMap::from([(
1088 new_sink_downstream.downstream_fragment_id,
1089 new_upstream_sink,
1090 )])
1091 } else {
1092 HashMap::new()
1093 };
1094
1095 let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1096 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1097
1098 let add_mutation = AddMutation {
1099 actor_dispatchers: edges
1100 .dispatchers
1101 .extract_if(|fragment_id, _| {
1102 upstream_fragment_downstreams.contains_key(fragment_id)
1103 })
1104 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1105 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1106 .collect(),
1107 added_actors,
1108 actor_splits,
1109 pause: is_currently_paused,
1111 subscriptions_to_add,
1112 backfill_nodes_to_pause,
1113 actor_cdc_table_snapshot_splits,
1114 new_upstream_sinks,
1115 };
1116
1117 Ok(Mutation::Add(add_mutation))
1118 }
1119 }
1120 }
1121
1122 pub(super) fn replace_stream_job_to_mutation(
1124 ReplaceStreamJobPlan {
1125 old_fragments,
1126 replace_upstream,
1127 upstream_fragment_downstreams,
1128 auto_refresh_schema_sinks,
1129 ..
1130 }: &ReplaceStreamJobPlan,
1131 edges: &mut FragmentEdgeBuildResult,
1132 database_info: &mut InflightDatabaseInfo,
1133 split_assignment: &SplitAssignment,
1134 ) -> MetaResult<Option<Mutation>> {
1135 {
1136 {
1137 let merge_updates = edges
1138 .merge_updates
1139 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1140 .collect();
1141 let dispatchers = edges
1142 .dispatchers
1143 .extract_if(|fragment_id, _| {
1144 upstream_fragment_downstreams.contains_key(fragment_id)
1145 })
1146 .collect();
1147 let actor_cdc_table_snapshot_splits = database_info
1148 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1149 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1150 let old_fragments = old_fragments.fragments.keys().copied();
1151 let auto_refresh_sink_fragment_ids = auto_refresh_schema_sinks
1152 .as_ref()
1153 .into_iter()
1154 .flat_map(|sinks| sinks.iter())
1155 .map(|sink| sink.original_fragment.fragment_id);
1156 Ok(Self::generate_update_mutation_for_replace_table(
1157 old_fragments
1158 .chain(auto_refresh_sink_fragment_ids)
1159 .flat_map(|fragment_id| {
1160 database_info.fragment(fragment_id).actors.keys().copied()
1161 }),
1162 merge_updates,
1163 dispatchers,
1164 split_assignment,
1165 actor_cdc_table_snapshot_splits,
1166 auto_refresh_schema_sinks.as_ref(),
1167 ))
1168 }
1169 }
1170 }
1171
1172 pub(super) fn reschedule_to_mutation(
1174 reschedules: &HashMap<FragmentId, Reschedule>,
1175 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1176 control_stream_manager: &ControlStreamManager,
1177 database_info: &mut InflightDatabaseInfo,
1178 ) -> MetaResult<Option<Mutation>> {
1179 {
1180 {
1181 let database_id = database_info.database_id;
1182 let mut dispatcher_update = HashMap::new();
1183 for reschedule in reschedules.values() {
1184 for &(upstream_fragment_id, dispatcher_id) in
1185 &reschedule.upstream_fragment_dispatcher_ids
1186 {
1187 let upstream_actor_ids = fragment_actors
1189 .get(&upstream_fragment_id)
1190 .expect("should contain");
1191
1192 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1193
1194 for &actor_id in upstream_actor_ids {
1196 let added_downstream_actor_id = if upstream_reschedule
1197 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1198 .unwrap_or(true)
1199 {
1200 reschedule
1201 .added_actors
1202 .values()
1203 .flatten()
1204 .cloned()
1205 .collect()
1206 } else {
1207 Default::default()
1208 };
1209 dispatcher_update
1211 .try_insert(
1212 (actor_id, dispatcher_id),
1213 DispatcherUpdate {
1214 actor_id,
1215 dispatcher_id,
1216 hash_mapping: reschedule
1217 .upstream_dispatcher_mapping
1218 .as_ref()
1219 .map(|m| m.to_protobuf()),
1220 added_downstream_actor_id,
1221 removed_downstream_actor_id: reschedule
1222 .removed_actors
1223 .iter()
1224 .cloned()
1225 .collect(),
1226 },
1227 )
1228 .unwrap();
1229 }
1230 }
1231 }
1232 let dispatcher_update = dispatcher_update.into_values().collect();
1233
1234 let mut merge_update = HashMap::new();
1235 for (&fragment_id, reschedule) in reschedules {
1236 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1237 let downstream_actor_ids = fragment_actors
1239 .get(&downstream_fragment_id)
1240 .expect("should contain");
1241
1242 let downstream_removed_actors: HashSet<_> = reschedules
1246 .get(&downstream_fragment_id)
1247 .map(|downstream_reschedule| {
1248 downstream_reschedule
1249 .removed_actors
1250 .iter()
1251 .copied()
1252 .collect()
1253 })
1254 .unwrap_or_default();
1255
1256 for &actor_id in downstream_actor_ids {
1258 if downstream_removed_actors.contains(&actor_id) {
1259 continue;
1260 }
1261
1262 merge_update
1264 .try_insert(
1265 (actor_id, fragment_id),
1266 MergeUpdate {
1267 actor_id,
1268 upstream_fragment_id: fragment_id,
1269 new_upstream_fragment_id: None,
1270 added_upstream_actors: reschedule
1271 .added_actors
1272 .iter()
1273 .flat_map(|(worker_id, actors)| {
1274 let host =
1275 control_stream_manager.host_addr(*worker_id);
1276 actors.iter().map(move |&actor_id| PbActorInfo {
1277 actor_id,
1278 host: Some(host.clone()),
1279 partial_graph_id: to_partial_graph_id(
1281 database_id,
1282 None,
1283 ),
1284 })
1285 })
1286 .collect(),
1287 removed_upstream_actor_id: reschedule
1288 .removed_actors
1289 .iter()
1290 .cloned()
1291 .collect(),
1292 },
1293 )
1294 .unwrap();
1295 }
1296 }
1297 }
1298 let merge_update = merge_update.into_values().collect();
1299
1300 let mut actor_vnode_bitmap_update = HashMap::new();
1301 for reschedule in reschedules.values() {
1302 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1304 let bitmap = bitmap.to_protobuf();
1305 actor_vnode_bitmap_update
1306 .try_insert(actor_id, bitmap)
1307 .unwrap();
1308 }
1309 }
1310 let dropped_actors = reschedules
1311 .values()
1312 .flat_map(|r| r.removed_actors.iter().copied())
1313 .collect();
1314 let mut actor_splits = HashMap::new();
1315 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1316 for (fragment_id, reschedule) in reschedules {
1317 for (actor_id, splits) in &reschedule.actor_splits {
1318 actor_splits.insert(
1319 *actor_id,
1320 ConnectorSplits {
1321 splits: splits.iter().map(ConnectorSplit::from).collect(),
1322 },
1323 );
1324 }
1325
1326 if let Some(assignment) =
1327 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1328 {
1329 actor_cdc_table_snapshot_splits.extend(assignment)
1330 }
1331 }
1332
1333 let actor_new_dispatchers = HashMap::new();
1335 let mutation = Mutation::Update(UpdateMutation {
1336 dispatcher_update,
1337 merge_update,
1338 actor_vnode_bitmap_update,
1339 dropped_actors,
1340 actor_splits,
1341 actor_new_dispatchers,
1342 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1343 splits: actor_cdc_table_snapshot_splits,
1344 }),
1345 sink_schema_change: Default::default(),
1346 subscriptions_to_drop: vec![],
1347 });
1348 tracing::debug!("update mutation: {mutation:?}");
1349 Ok(Some(mutation))
1350 }
1351 }
1352 }
1353
1354 pub(super) fn create_subscription_to_mutation(
1356 upstream_mv_table_id: TableId,
1357 subscription_id: SubscriptionId,
1358 ) -> Mutation {
1359 {
1360 Mutation::Add(AddMutation {
1361 actor_dispatchers: Default::default(),
1362 added_actors: vec![],
1363 actor_splits: Default::default(),
1364 pause: false,
1365 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1366 upstream_mv_table_id,
1367 subscriber_id: subscription_id.as_subscriber_id(),
1368 }],
1369 backfill_nodes_to_pause: vec![],
1370 actor_cdc_table_snapshot_splits: None,
1371 new_upstream_sinks: Default::default(),
1372 })
1373 }
1374 }
1375
1376 pub(super) fn drop_subscription_to_mutation(
1378 upstream_mv_table_id: TableId,
1379 subscription_id: SubscriptionId,
1380 ) -> Mutation {
1381 {
1382 Mutation::DropSubscriptions(DropSubscriptionsMutation {
1383 info: vec![SubscriptionUpstreamInfo {
1384 subscriber_id: subscription_id.as_subscriber_id(),
1385 upstream_mv_table_id,
1386 }],
1387 })
1388 }
1389 }
1390
1391 pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1393 {
1394 {
1395 let mut connector_props_infos = HashMap::default();
1396 for (k, v) in config {
1397 connector_props_infos.insert(
1398 k.as_raw_id(),
1399 ConnectorPropsInfo {
1400 connector_props_info: v.clone(),
1401 },
1402 );
1403 }
1404 Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1405 connector_props_infos,
1406 })
1407 }
1408 }
1409 }
1410
1411 pub(super) fn refresh_to_mutation(
1413 table_id: TableId,
1414 associated_source_id: SourceId,
1415 ) -> Mutation {
1416 Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1417 table_id,
1418 associated_source_id,
1419 })
1420 }
1421
1422 pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1424 Mutation::ListFinish(ListFinishMutation {
1425 associated_source_id,
1426 })
1427 }
1428
1429 pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1431 Mutation::LoadFinish(LoadFinishMutation {
1432 associated_source_id,
1433 })
1434 }
1435
1436 pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1438 Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1439 source_id: source_id.as_raw_id(),
1440 })
1441 }
1442
1443 pub(super) fn resume_backfill_to_mutation(
1445 target: &ResumeBackfillTarget,
1446 database_info: &InflightDatabaseInfo,
1447 ) -> MetaResult<Option<Mutation>> {
1448 {
1449 {
1450 let fragment_ids: HashSet<_> = match target {
1451 ResumeBackfillTarget::Job(job_id) => {
1452 database_info.backfill_fragment_ids_for_job(*job_id)?
1453 }
1454 ResumeBackfillTarget::Fragment(fragment_id) => {
1455 if !database_info.is_backfill_fragment(*fragment_id)? {
1456 return Err(MetaError::invalid_parameter(format!(
1457 "fragment {} is not a backfill node",
1458 fragment_id
1459 )));
1460 }
1461 HashSet::from([*fragment_id])
1462 }
1463 };
1464 if fragment_ids.is_empty() {
1465 warn!(
1466 ?target,
1467 "resume backfill command ignored because no backfill fragments found"
1468 );
1469 Ok(None)
1470 } else {
1471 Ok(Some(Mutation::StartFragmentBackfill(
1472 StartFragmentBackfillMutation {
1473 fragment_ids: fragment_ids.into_iter().collect(),
1474 },
1475 )))
1476 }
1477 }
1478 }
1479 }
1480
1481 pub(super) fn inject_source_offsets_to_mutation(
1483 source_id: SourceId,
1484 split_offsets: &HashMap<String, String>,
1485 ) -> Mutation {
1486 Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1487 source_id: source_id.as_raw_id(),
1488 split_offsets: split_offsets.clone(),
1489 })
1490 }
1491
1492 pub(super) fn create_streaming_job_actors_to_create(
1494 info: &CreateStreamingJobCommandInfo,
1495 edges: &mut FragmentEdgeBuildResult,
1496 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1497 actor_location: &HashMap<ActorId, WorkerId>,
1498 ) -> StreamJobActorsToCreate {
1499 {
1500 {
1501 edges.collect_actors_to_create(info.stream_job_fragments.fragments.values().map(
1502 |fragment| {
1503 let actors = stream_actors
1504 .get(&fragment.fragment_id)
1505 .into_iter()
1506 .flatten()
1507 .map(|actor| (actor, actor_location[&actor.actor_id]));
1508 (
1509 fragment.fragment_id,
1510 &fragment.nodes,
1511 actors,
1512 [], )
1514 },
1515 ))
1516 }
1517 }
1518 }
1519
1520 pub(super) fn reschedule_actors_to_create(
1522 reschedules: &HashMap<FragmentId, Reschedule>,
1523 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1524 database_info: &InflightDatabaseInfo,
1525 control_stream_manager: &ControlStreamManager,
1526 ) -> StreamJobActorsToCreate {
1527 {
1528 {
1529 let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1530 reschedules.iter().map(|(fragment_id, reschedule)| {
1531 (
1532 *fragment_id,
1533 reschedule.newly_created_actors.values().map(
1534 |((actor, dispatchers), _)| {
1535 (actor.actor_id, dispatchers.as_slice())
1536 },
1537 ),
1538 )
1539 }),
1540 Some((reschedules, fragment_actors)),
1541 database_info,
1542 control_stream_manager,
1543 );
1544 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1545 for (fragment_id, (actor, dispatchers), worker_id) in
1546 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1547 reschedule
1548 .newly_created_actors
1549 .values()
1550 .map(|(actors, status)| (*fragment_id, actors, status))
1551 })
1552 {
1553 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1554 map.entry(*worker_id)
1555 .or_default()
1556 .entry(fragment_id)
1557 .or_insert_with(|| {
1558 let node = database_info.fragment(fragment_id).nodes.clone();
1559 let subscribers =
1560 database_info.fragment_subscribers(fragment_id).collect();
1561 (node, vec![], subscribers)
1562 })
1563 .1
1564 .push((actor.clone(), upstreams, dispatchers.clone()));
1565 }
1566 map
1567 }
1568 }
1569 }
1570
1571 pub(super) fn replace_stream_job_actors_to_create(
1573 replace_table: &ReplaceStreamJobPlan,
1574 edges: &mut FragmentEdgeBuildResult,
1575 database_info: &InflightDatabaseInfo,
1576 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1577 actor_location: &HashMap<ActorId, WorkerId>,
1578 ) -> StreamJobActorsToCreate {
1579 {
1580 {
1581 let mut actors = edges.collect_actors_to_create(
1582 replace_table
1583 .new_fragments
1584 .fragments
1585 .values()
1586 .map(|fragment| {
1587 let actors = stream_actors
1588 .get(&fragment.fragment_id)
1589 .into_iter()
1590 .flatten()
1591 .map(|actor| (actor, actor_location[&actor.actor_id]));
1592 (
1593 fragment.fragment_id,
1594 &fragment.nodes,
1595 actors,
1596 database_info
1597 .job_subscribers(replace_table.old_fragments.stream_job_id),
1598 )
1599 }),
1600 );
1601
1602 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1604 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1605 (
1606 sink.new_fragment.fragment_id,
1607 &sink.new_fragment.nodes,
1608 stream_actors
1609 .get(&sink.new_fragment.fragment_id)
1610 .into_iter()
1611 .flatten()
1612 .map(|actor| (actor, actor_location[&actor.actor_id])),
1613 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1614 )
1615 }));
1616 for (worker_id, fragment_actors) in sink_actors {
1617 actors.entry(worker_id).or_default().extend(fragment_actors);
1618 }
1619 }
1620 actors
1621 }
1622 }
1623 }
1624
1625 fn generate_update_mutation_for_replace_table(
1626 dropped_actors: impl IntoIterator<Item = ActorId>,
1627 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1628 dispatchers: FragmentActorDispatchers,
1629 split_assignment: &SplitAssignment,
1630 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1631 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1632 ) -> Option<Mutation> {
1633 let dropped_actors = dropped_actors.into_iter().collect();
1634
1635 let actor_new_dispatchers = dispatchers
1636 .into_values()
1637 .flatten()
1638 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1639 .collect();
1640
1641 let actor_splits = split_assignment
1642 .values()
1643 .flat_map(build_actor_connector_splits)
1644 .collect();
1645 Some(Mutation::Update(UpdateMutation {
1646 actor_new_dispatchers,
1647 merge_update: merge_updates.into_values().flatten().collect(),
1648 dropped_actors,
1649 actor_splits,
1650 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1651 sink_schema_change: auto_refresh_schema_sinks
1652 .as_ref()
1653 .into_iter()
1654 .flat_map(|sinks| {
1655 sinks.iter().map(|sink| {
1656 let op = if !sink.removed_column_names.is_empty() {
1657 PbSinkSchemaChangeOp::DropColumns(PbSinkDropColumnsOp {
1658 column_names: sink.removed_column_names.clone(),
1659 })
1660 } else {
1661 PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1662 fields: sink
1663 .newly_add_fields
1664 .iter()
1665 .map(|field| field.to_prost())
1666 .collect(),
1667 })
1668 };
1669 (
1670 sink.original_sink.id.as_raw_id(),
1671 PbSinkSchemaChange {
1672 original_schema: sink
1673 .original_sink
1674 .columns
1675 .iter()
1676 .map(|col| PbField {
1677 data_type: Some(
1678 col.column_desc
1679 .as_ref()
1680 .unwrap()
1681 .column_type
1682 .as_ref()
1683 .unwrap()
1684 .clone(),
1685 ),
1686 name: col.column_desc.as_ref().unwrap().name.clone(),
1687 })
1688 .collect(),
1689 op: Some(op),
1690 },
1691 )
1692 })
1693 })
1694 .collect(),
1695 ..Default::default()
1696 }))
1697 }
1698}
1699
1700impl Command {
1701 #[expect(clippy::type_complexity)]
1702 pub(super) fn collect_database_partial_graph_actor_upstreams(
1703 actor_dispatchers: impl Iterator<
1704 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1705 >,
1706 reschedule_dispatcher_update: Option<(
1707 &HashMap<FragmentId, Reschedule>,
1708 &HashMap<FragmentId, HashSet<ActorId>>,
1709 )>,
1710 database_info: &InflightDatabaseInfo,
1711 control_stream_manager: &ControlStreamManager,
1712 ) -> HashMap<ActorId, ActorUpstreams> {
1713 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1714 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1715 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1716 for (upstream_actor_id, dispatchers) in upstream_actors {
1717 let upstream_actor_location =
1718 upstream_fragment.actors[&upstream_actor_id].worker_id;
1719 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1720 for downstream_actor_id in dispatchers
1721 .iter()
1722 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1723 {
1724 actor_upstreams
1725 .entry(*downstream_actor_id)
1726 .or_default()
1727 .entry(upstream_fragment_id)
1728 .or_default()
1729 .insert(
1730 upstream_actor_id,
1731 PbActorInfo {
1732 actor_id: upstream_actor_id,
1733 host: Some(upstream_actor_host.clone()),
1734 partial_graph_id: to_partial_graph_id(
1735 database_info.database_id,
1736 None,
1737 ),
1738 },
1739 );
1740 }
1741 }
1742 }
1743 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1744 for reschedule in reschedules.values() {
1745 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1746 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1747 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1748 for upstream_actor_id in fragment_actors
1749 .get(upstream_fragment_id)
1750 .expect("should exist")
1751 {
1752 let upstream_actor_location =
1753 upstream_fragment.actors[upstream_actor_id].worker_id;
1754 let upstream_actor_host =
1755 control_stream_manager.host_addr(upstream_actor_location);
1756 if let Some(upstream_reschedule) = upstream_reschedule
1757 && upstream_reschedule
1758 .removed_actors
1759 .contains(upstream_actor_id)
1760 {
1761 continue;
1762 }
1763 for (_, downstream_actor_id) in
1764 reschedule
1765 .added_actors
1766 .iter()
1767 .flat_map(|(worker_id, actors)| {
1768 actors.iter().map(|actor| (*worker_id, *actor))
1769 })
1770 {
1771 actor_upstreams
1772 .entry(downstream_actor_id)
1773 .or_default()
1774 .entry(*upstream_fragment_id)
1775 .or_default()
1776 .insert(
1777 *upstream_actor_id,
1778 PbActorInfo {
1779 actor_id: *upstream_actor_id,
1780 host: Some(upstream_actor_host.clone()),
1781 partial_graph_id: to_partial_graph_id(
1782 database_info.database_id,
1783 None,
1784 ),
1785 },
1786 );
1787 }
1788 }
1789 }
1790 }
1791 }
1792 actor_upstreams
1793 }
1794}