1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation, streaming_job};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37 PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
49 PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
50 StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
51};
52use risingwave_pb::stream_service::BarrierCompleteResponse;
53use tracing::warn;
54
55use super::info::InflightDatabaseInfo;
56use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
57use crate::barrier::edge_builder::FragmentEdgeBuildResult;
58use crate::barrier::info::BarrierInfo;
59use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
60use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
61use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
62use crate::controller::scale::LoadedFragmentContext;
63use crate::controller::utils::StreamingJobExtraInfo;
64use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
65use crate::manager::{StreamingJob, StreamingJobType};
66use crate::model::{
67 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
68 FragmentId, FragmentReplaceUpstream, StreamActor, StreamActorWithDispatchers,
69 StreamJobActorsToCreate, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
70};
71use crate::stream::{
72 AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
73 ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
74 build_actor_connector_splits,
75};
76use crate::{MetaError, MetaResult};
77
78#[derive(Debug, Clone)]
81pub struct Reschedule {
82 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
84
85 pub removed_actors: HashSet<ActorId>,
87
88 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
90
91 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
93 pub upstream_dispatcher_mapping: Option<ActorMapping>,
98
99 pub downstream_fragment_ids: Vec<FragmentId>,
101
102 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
106
107 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
108}
109
110#[derive(Debug, Clone)]
111pub struct ReschedulePlan {
112 pub reschedules: HashMap<FragmentId, Reschedule>,
113 pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
116}
117
118#[derive(Debug, Clone)]
120pub struct RescheduleContext {
121 pub loaded: LoadedFragmentContext,
122 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
123 pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
124 pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
125 pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
126}
127
128impl RescheduleContext {
129 pub fn empty() -> Self {
130 Self {
131 loaded: LoadedFragmentContext::default(),
132 job_extra_info: HashMap::new(),
133 upstream_fragments: HashMap::new(),
134 downstream_fragments: HashMap::new(),
135 downstream_relations: HashMap::new(),
136 }
137 }
138
139 pub fn is_empty(&self) -> bool {
140 self.loaded.is_empty()
141 }
142
143 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
144 let loaded = self.loaded.for_database(database_id)?;
145 let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
146 let fragment_ids: HashSet<FragmentId> = loaded
149 .job_fragments
150 .values()
151 .flat_map(|fragments| fragments.keys().copied())
152 .collect();
153
154 let job_extra_info = self
155 .job_extra_info
156 .iter()
157 .filter(|(job_id, _)| job_ids.contains(*job_id))
158 .map(|(job_id, info)| (*job_id, info.clone()))
159 .collect();
160
161 let upstream_fragments = self
162 .upstream_fragments
163 .iter()
164 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
165 .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
166 .collect();
167
168 let downstream_fragments = self
169 .downstream_fragments
170 .iter()
171 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
172 .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
173 .collect();
174
175 let downstream_relations = self
176 .downstream_relations
177 .iter()
178 .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
182 .map(|(key, relation)| (*key, relation.clone()))
183 .collect();
184
185 Some(Self {
186 loaded,
187 job_extra_info,
188 upstream_fragments,
189 downstream_fragments,
190 downstream_relations,
191 })
192 }
193
194 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
197 let Self {
198 loaded,
199 job_extra_info,
200 upstream_fragments,
201 downstream_fragments,
202 downstream_relations,
203 } = self;
204
205 let mut contexts: HashMap<_, _> = loaded
206 .into_database_contexts()
207 .into_iter()
208 .map(|(database_id, loaded)| {
209 (
210 database_id,
211 Self {
212 loaded,
213 job_extra_info: HashMap::new(),
214 upstream_fragments: HashMap::new(),
215 downstream_fragments: HashMap::new(),
216 downstream_relations: HashMap::new(),
217 },
218 )
219 })
220 .collect();
221
222 if contexts.is_empty() {
223 return contexts;
224 }
225
226 let mut job_databases = HashMap::new();
227 let mut fragment_databases = HashMap::new();
228 for (&database_id, context) in &contexts {
229 for job_id in context.loaded.job_map.keys().copied() {
230 job_databases.insert(job_id, database_id);
231 }
232 for fragment_id in context
233 .loaded
234 .job_fragments
235 .values()
236 .flat_map(|fragments| fragments.keys().copied())
237 {
238 fragment_databases.insert(fragment_id, database_id);
239 }
240 }
241
242 for (job_id, info) in job_extra_info {
243 if let Some(database_id) = job_databases.get(&job_id).copied() {
244 contexts
245 .get_mut(&database_id)
246 .expect("database context should exist for job")
247 .job_extra_info
248 .insert(job_id, info);
249 }
250 }
251
252 for (fragment_id, upstreams) in upstream_fragments {
253 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
254 contexts
255 .get_mut(&database_id)
256 .expect("database context should exist for fragment")
257 .upstream_fragments
258 .insert(fragment_id, upstreams);
259 }
260 }
261
262 for (fragment_id, downstreams) in downstream_fragments {
263 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
264 contexts
265 .get_mut(&database_id)
266 .expect("database context should exist for fragment")
267 .downstream_fragments
268 .insert(fragment_id, downstreams);
269 }
270 }
271
272 for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
273 if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
276 contexts
277 .get_mut(&database_id)
278 .expect("database context should exist for relation source")
279 .downstream_relations
280 .insert((source_fragment_id, target_fragment_id), relation);
281 }
282 }
283
284 contexts
285 }
286}
287
288#[derive(Debug, Clone)]
295pub struct ReplaceStreamJobPlan {
296 pub old_fragments: StreamJobFragments,
297 pub new_fragments: StreamJobFragmentsToCreate,
298 pub database_resource_group: String,
300 pub replace_upstream: FragmentReplaceUpstream,
303 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
304 pub split_plan: ReplaceJobSplitPlan,
307 pub streaming_job: StreamingJob,
309 pub streaming_job_model: streaming_job::Model,
311 pub tmp_id: JobId,
313 pub to_drop_state_table_ids: Vec<TableId>,
315 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
316}
317
318impl ReplaceStreamJobPlan {
319 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
321 let mut fragment_replacements = HashMap::new();
322 for (upstream_fragment_id, new_upstream_fragment_id) in
323 self.replace_upstream.values().flatten()
324 {
325 {
326 let r =
327 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
328 if let Some(r) = r {
329 assert_eq!(
330 *new_upstream_fragment_id, r,
331 "one fragment is replaced by multiple fragments"
332 );
333 }
334 }
335 }
336 fragment_replacements
337 }
338}
339
340#[derive(educe::Educe, Clone)]
341#[educe(Debug)]
342pub struct CreateStreamingJobCommandInfo {
343 #[educe(Debug(ignore))]
344 pub stream_job_fragments: StreamJobFragmentsToCreate,
345 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
346 pub database_resource_group: String,
348 pub init_split_assignment: SourceSplitAssignment,
350 pub definition: String,
351 pub job_type: StreamingJobType,
352 pub create_type: CreateType,
353 pub streaming_job: StreamingJob,
354 pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
355 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
356 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
357 pub is_serverless: bool,
358 pub streaming_job_model: streaming_job::Model,
360}
361
362impl StreamJobFragments {
363 pub(super) fn new_fragment_info<'a>(
366 &'a self,
367 stream_actors: &'a HashMap<FragmentId, Vec<StreamActor>>,
368 actor_location: &'a HashMap<ActorId, WorkerId>,
369 assignment: &'a SplitAssignment,
370 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
371 self.fragments.values().map(|fragment| {
372 (
373 fragment.fragment_id,
374 InflightFragmentInfo {
375 fragment_id: fragment.fragment_id,
376 distribution_type: fragment.distribution_type.into(),
377 fragment_type_mask: fragment.fragment_type_mask,
378 vnode_count: fragment.vnode_count(),
379 nodes: fragment.nodes.clone(),
380 actors: stream_actors
381 .get(&fragment.fragment_id)
382 .into_iter()
383 .flatten()
384 .map(|actor| {
385 (
386 actor.actor_id,
387 InflightActorInfo {
388 worker_id: actor_location[&actor.actor_id],
389 vnode_bitmap: actor.vnode_bitmap.clone(),
390 splits: assignment
391 .get(&fragment.fragment_id)
392 .and_then(|s| s.get(&actor.actor_id))
393 .cloned()
394 .unwrap_or_default(),
395 },
396 )
397 })
398 .collect(),
399 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
400 },
401 )
402 })
403 }
404}
405
406#[derive(Debug, Clone)]
407pub struct SnapshotBackfillInfo {
408 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
412}
413
414#[derive(Debug, Clone)]
415pub enum CreateStreamingJobType {
416 Normal,
417 SinkIntoTable(UpstreamSinkInfo),
418 SnapshotBackfill(SnapshotBackfillInfo),
419}
420
421#[derive(Debug)]
426pub enum Command {
427 Flush,
430
431 Pause,
434
435 Resume,
439
440 DropStreamingJobs {
448 streaming_job_ids: HashSet<JobId>,
449 unregistered_state_table_ids: HashSet<TableId>,
450 unregistered_fragment_ids: HashSet<FragmentId>,
451 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
453 },
454
455 CreateStreamingJob {
465 info: CreateStreamingJobCommandInfo,
466 job_type: CreateStreamingJobType,
467 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
468 },
469
470 RescheduleIntent {
472 context: RescheduleContext,
473 reschedule_plan: Option<ReschedulePlan>,
478 },
479
480 ReplaceStreamJob(ReplaceStreamJobPlan),
487
488 SourceChangeSplit(SplitState),
491
492 Throttle {
495 jobs: HashSet<JobId>,
496 config: HashMap<FragmentId, ThrottleConfig>,
497 },
498
499 CreateSubscription {
502 subscription_id: SubscriptionId,
503 upstream_mv_table_id: TableId,
504 retention_second: u64,
505 },
506
507 DropSubscription {
511 subscription_id: SubscriptionId,
512 upstream_mv_table_id: TableId,
513 },
514
515 AlterSubscriptionRetention {
517 subscription_id: SubscriptionId,
518 upstream_mv_table_id: TableId,
519 retention_second: u64,
520 },
521
522 ConnectorPropsChange(ConnectorPropsChange),
523
524 Refresh {
527 table_id: TableId,
528 associated_source_id: SourceId,
529 },
530 ListFinish {
531 table_id: TableId,
532 associated_source_id: SourceId,
533 },
534 LoadFinish {
535 table_id: TableId,
536 associated_source_id: SourceId,
537 },
538
539 ResetSource {
542 source_id: SourceId,
543 },
544
545 ResumeBackfill {
548 target: ResumeBackfillTarget,
549 },
550
551 InjectSourceOffsets {
555 source_id: SourceId,
556 split_offsets: HashMap<String, String>,
558 },
559}
560
561#[derive(Debug, Clone, Copy)]
562pub enum ResumeBackfillTarget {
563 Job(JobId),
564 Fragment(FragmentId),
565}
566
567impl std::fmt::Display for Command {
569 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
570 match self {
571 Command::Flush => write!(f, "Flush"),
572 Command::Pause => write!(f, "Pause"),
573 Command::Resume => write!(f, "Resume"),
574 Command::DropStreamingJobs {
575 streaming_job_ids, ..
576 } => {
577 write!(
578 f,
579 "DropStreamingJobs: {}",
580 streaming_job_ids.iter().sorted().join(", ")
581 )
582 }
583 Command::CreateStreamingJob { info, .. } => {
584 write!(f, "CreateStreamingJob: {}", info.streaming_job)
585 }
586 Command::RescheduleIntent {
587 reschedule_plan, ..
588 } => {
589 if reschedule_plan.is_some() {
590 write!(f, "RescheduleIntent(planned)")
591 } else {
592 write!(f, "RescheduleIntent")
593 }
594 }
595 Command::ReplaceStreamJob(plan) => {
596 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
597 }
598 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
599 Command::Throttle { .. } => write!(f, "Throttle"),
600 Command::CreateSubscription {
601 subscription_id, ..
602 } => write!(f, "CreateSubscription: {subscription_id}"),
603 Command::DropSubscription {
604 subscription_id, ..
605 } => write!(f, "DropSubscription: {subscription_id}"),
606 Command::AlterSubscriptionRetention {
607 subscription_id,
608 retention_second,
609 ..
610 } => write!(
611 f,
612 "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
613 ),
614 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
615 Command::Refresh {
616 table_id,
617 associated_source_id,
618 } => write!(
619 f,
620 "Refresh: {} (source: {})",
621 table_id, associated_source_id
622 ),
623 Command::ListFinish {
624 table_id,
625 associated_source_id,
626 } => write!(
627 f,
628 "ListFinish: {} (source: {})",
629 table_id, associated_source_id
630 ),
631 Command::LoadFinish {
632 table_id,
633 associated_source_id,
634 } => write!(
635 f,
636 "LoadFinish: {} (source: {})",
637 table_id, associated_source_id
638 ),
639 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
640 Command::ResumeBackfill { target } => match target {
641 ResumeBackfillTarget::Job(job_id) => {
642 write!(f, "ResumeBackfill: job={job_id}")
643 }
644 ResumeBackfillTarget::Fragment(fragment_id) => {
645 write!(f, "ResumeBackfill: fragment={fragment_id}")
646 }
647 },
648 Command::InjectSourceOffsets {
649 source_id,
650 split_offsets,
651 } => write!(
652 f,
653 "InjectSourceOffsets: {} ({} splits)",
654 source_id,
655 split_offsets.len()
656 ),
657 }
658 }
659}
660
661impl Command {
662 pub fn pause() -> Self {
663 Self::Pause
664 }
665
666 pub fn resume() -> Self {
667 Self::Resume
668 }
669
670 pub fn need_checkpoint(&self) -> bool {
671 !matches!(self, Command::Resume)
673 }
674}
675
676#[derive(Debug)]
677pub enum PostCollectCommand {
678 Command(String),
679 DropStreamingJobs {
680 streaming_job_ids: HashSet<JobId>,
681 unregistered_state_table_ids: HashSet<TableId>,
682 },
683 CreateStreamingJob {
684 info: CreateStreamingJobCommandInfo,
685 job_type: CreateStreamingJobType,
686 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
687 resolved_split_assignment: SplitAssignment,
688 },
689 Reschedule {
690 reschedules: HashMap<FragmentId, Reschedule>,
691 },
692 ReplaceStreamJob {
693 plan: ReplaceStreamJobPlan,
694 resolved_split_assignment: SplitAssignment,
695 },
696 SourceChangeSplit {
697 split_assignment: SplitAssignment,
698 },
699 CreateSubscription {
700 subscription_id: SubscriptionId,
701 },
702 ConnectorPropsChange(ConnectorPropsChange),
703 ResumeBackfill {
704 target: ResumeBackfillTarget,
705 },
706}
707
708impl PostCollectCommand {
709 pub fn barrier() -> Self {
710 PostCollectCommand::Command("barrier".to_owned())
711 }
712
713 pub fn command_name(&self) -> &str {
714 match self {
715 PostCollectCommand::Command(name) => name.as_str(),
716 PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
717 PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
718 PostCollectCommand::Reschedule { .. } => "Reschedule",
719 PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
720 PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
721 PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
722 PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
723 PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
724 }
725 }
726}
727
728impl Display for PostCollectCommand {
729 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
730 f.write_str(self.command_name())
731 }
732}
733
734#[derive(Debug, Clone)]
735pub enum BarrierKind {
736 Initial,
737 Barrier,
738 Checkpoint(Vec<u64>),
740}
741
742impl BarrierKind {
743 pub fn to_protobuf(&self) -> PbBarrierKind {
744 match self {
745 BarrierKind::Initial => PbBarrierKind::Initial,
746 BarrierKind::Barrier => PbBarrierKind::Barrier,
747 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
748 }
749 }
750
751 pub fn is_checkpoint(&self) -> bool {
752 matches!(self, BarrierKind::Checkpoint(_))
753 }
754
755 pub fn is_initial(&self) -> bool {
756 matches!(self, BarrierKind::Initial)
757 }
758
759 pub fn as_str_name(&self) -> &'static str {
760 match self {
761 BarrierKind::Initial => "Initial",
762 BarrierKind::Barrier => "Barrier",
763 BarrierKind::Checkpoint(_) => "Checkpoint",
764 }
765 }
766}
767
768pub(super) struct CommandContext {
771 mv_subscription_max_retention: HashMap<TableId, u64>,
772
773 pub(super) barrier_info: BarrierInfo,
774
775 pub(super) table_ids_to_commit: HashSet<TableId>,
776
777 pub(super) command: PostCollectCommand,
778
779 _span: tracing::Span,
785}
786
787impl std::fmt::Debug for CommandContext {
788 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
789 f.debug_struct("CommandContext")
790 .field("barrier_info", &self.barrier_info)
791 .field("command", &self.command.command_name())
792 .finish()
793 }
794}
795
796impl CommandContext {
797 pub(super) fn new(
798 barrier_info: BarrierInfo,
799 mv_subscription_max_retention: HashMap<TableId, u64>,
800 table_ids_to_commit: HashSet<TableId>,
801 command: PostCollectCommand,
802 span: tracing::Span,
803 ) -> Self {
804 Self {
805 mv_subscription_max_retention,
806 barrier_info,
807 table_ids_to_commit,
808 command,
809 _span: span,
810 }
811 }
812
813 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
814 let Some(truncate_timestamptz) = Timestamptz::from_secs(
815 self.barrier_info
816 .prev_epoch
817 .value()
818 .as_timestamptz()
819 .timestamp()
820 - retention_second as i64,
821 ) else {
822 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
823 return self.barrier_info.prev_epoch.value();
824 };
825 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
826 }
827
828 pub(super) fn collect_commit_epoch_info(
829 &self,
830 info: &mut CommitEpochInfo,
831 resps: Vec<BarrierCompleteResponse>,
832 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
833 ) {
834 let (
835 sst_to_context,
836 synced_ssts,
837 new_table_watermarks,
838 old_value_ssts,
839 vector_index_adds,
840 truncate_tables,
841 ) = collect_resp_info(resps);
842
843 let new_table_fragment_infos =
844 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
845 assert!(!matches!(
846 job_type,
847 CreateStreamingJobType::SnapshotBackfill(_)
848 ));
849 let table_fragments = &info.stream_job_fragments;
850 let mut table_ids: HashSet<_> =
851 table_fragments.internal_table_ids().into_iter().collect();
852 if let Some(mv_table_id) = table_fragments.mv_table_id() {
853 table_ids.insert(mv_table_id);
854 }
855
856 vec![NewTableFragmentInfo { table_ids }]
857 } else {
858 vec![]
859 };
860
861 let mut mv_log_store_truncate_epoch = HashMap::new();
862 let mut update_truncate_epoch =
864 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
865 Entry::Occupied(mut entry) => {
866 let prev_truncate_epoch = entry.get_mut();
867 if truncate_epoch < *prev_truncate_epoch {
868 *prev_truncate_epoch = truncate_epoch;
869 }
870 }
871 Entry::Vacant(entry) => {
872 entry.insert(truncate_epoch);
873 }
874 };
875 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
876 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
877 update_truncate_epoch(*mv_table_id, truncate_epoch);
878 }
879 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
880 for mv_table_id in upstream_mv_table_ids {
881 update_truncate_epoch(mv_table_id, backfill_epoch);
882 }
883 }
884
885 let table_new_change_log = build_table_change_log_delta(
886 old_value_ssts.into_iter(),
887 synced_ssts.iter().map(|sst| &sst.sst_info),
888 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
889 mv_log_store_truncate_epoch.into_iter(),
890 );
891
892 let epoch = self.barrier_info.prev_epoch();
893 for table_id in &self.table_ids_to_commit {
894 info.tables_to_commit
895 .try_insert(*table_id, epoch)
896 .expect("non duplicate");
897 }
898
899 info.sstables.extend(synced_ssts);
900 info.new_table_watermarks.extend(new_table_watermarks);
901 info.sst_to_context.extend(sst_to_context);
902 info.new_table_fragment_infos
903 .extend(new_table_fragment_infos);
904 info.change_log_delta.extend(table_new_change_log);
905 for (table_id, vector_index_adds) in vector_index_adds {
906 info.vector_index_delta
907 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
908 .expect("non-duplicate");
909 }
910 if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
911 && let Some(index_table) = collect_new_vector_index_info(job_info)
912 {
913 info.vector_index_delta
914 .try_insert(
915 index_table.id,
916 VectorIndexDelta::Init(PbVectorIndexInit {
917 info: Some(index_table.vector_index_info.unwrap()),
918 }),
919 )
920 .expect("non-duplicate");
921 }
922 info.truncate_tables.extend(truncate_tables);
923 }
924}
925
926impl Command {
927 pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
929 {
930 {
931 if !is_currently_paused {
934 Some(Mutation::Pause(PauseMutation {}))
935 } else {
936 None
937 }
938 }
939 }
940 }
941
942 pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
944 {
945 {
946 if is_currently_paused {
948 Some(Mutation::Resume(ResumeMutation {}))
949 } else {
950 None
951 }
952 }
953 }
954 }
955
956 pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
958 {
959 {
960 let mut diff = HashMap::new();
961
962 for actor_splits in split_assignment.values() {
963 diff.extend(actor_splits.clone());
964 }
965
966 Mutation::Splits(SourceChangeSplitMutation {
967 actor_splits: build_actor_connector_splits(&diff),
968 })
969 }
970 }
971 }
972
973 pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
975 {
976 {
977 let config = config.clone();
978 Mutation::Throttle(ThrottleMutation {
979 fragment_throttle: config,
980 })
981 }
982 }
983 }
984
985 pub(super) fn drop_streaming_jobs_to_mutation(
987 actors: &Vec<ActorId>,
988 dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
989 ) -> Mutation {
990 {
991 Mutation::Stop(StopMutation {
992 actors: actors.clone(),
993 dropped_sink_fragments: dropped_sink_fragment_by_targets
994 .values()
995 .flatten()
996 .cloned()
997 .collect(),
998 })
999 }
1000 }
1001
1002 pub(super) fn create_streaming_job_to_mutation(
1004 info: &CreateStreamingJobCommandInfo,
1005 job_type: &CreateStreamingJobType,
1006 is_currently_paused: bool,
1007 edges: &mut FragmentEdgeBuildResult,
1008 control_stream_manager: &ControlStreamManager,
1009 actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
1010 split_assignment: &SplitAssignment,
1011 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1012 actor_location: &HashMap<ActorId, WorkerId>,
1013 ) -> MetaResult<Mutation> {
1014 {
1015 {
1016 let CreateStreamingJobCommandInfo {
1017 stream_job_fragments,
1018 upstream_fragment_downstreams,
1019 fragment_backfill_ordering,
1020 streaming_job,
1021 ..
1022 } = info;
1023 let database_id = streaming_job.database_id();
1024 let added_actors: Vec<ActorId> = stream_actors
1025 .values()
1026 .flatten()
1027 .map(|actor| actor.actor_id)
1028 .collect();
1029 let actor_splits = split_assignment
1030 .values()
1031 .flat_map(build_actor_connector_splits)
1032 .collect();
1033 let subscriptions_to_add =
1034 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1035 job_type
1036 {
1037 snapshot_backfill_info
1038 .upstream_mv_table_id_to_backfill_epoch
1039 .keys()
1040 .map(|table_id| SubscriptionUpstreamInfo {
1041 subscriber_id: stream_job_fragments
1042 .stream_job_id()
1043 .as_subscriber_id(),
1044 upstream_mv_table_id: *table_id,
1045 })
1046 .collect()
1047 } else {
1048 Default::default()
1049 };
1050 let backfill_nodes_to_pause: Vec<_> =
1051 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1052 .into_iter()
1053 .collect();
1054
1055 let new_upstream_sinks =
1056 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1057 sink_fragment_id,
1058 sink_output_fields,
1059 project_exprs,
1060 new_sink_downstream,
1061 ..
1062 }) = job_type
1063 {
1064 let new_sink_actors = stream_actors
1065 .get(sink_fragment_id)
1066 .unwrap_or_else(|| {
1067 panic!("upstream sink fragment {sink_fragment_id} not exist")
1068 })
1069 .iter()
1070 .map(|actor| {
1071 let worker_id = actor_location[&actor.actor_id];
1072 PbActorInfo {
1073 actor_id: actor.actor_id,
1074 host: Some(control_stream_manager.host_addr(worker_id)),
1075 partial_graph_id: to_partial_graph_id(database_id, None),
1076 }
1077 });
1078 let new_upstream_sink = PbNewUpstreamSink {
1079 info: Some(PbUpstreamSinkInfo {
1080 upstream_fragment_id: *sink_fragment_id,
1081 sink_output_schema: sink_output_fields.clone(),
1082 project_exprs: project_exprs.clone(),
1083 }),
1084 upstream_actors: new_sink_actors.collect(),
1085 };
1086 HashMap::from([(
1087 new_sink_downstream.downstream_fragment_id,
1088 new_upstream_sink,
1089 )])
1090 } else {
1091 HashMap::new()
1092 };
1093
1094 let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1095 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1096
1097 let add_mutation = AddMutation {
1098 actor_dispatchers: edges
1099 .dispatchers
1100 .extract_if(|fragment_id, _| {
1101 upstream_fragment_downstreams.contains_key(fragment_id)
1102 })
1103 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1104 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1105 .collect(),
1106 added_actors,
1107 actor_splits,
1108 pause: is_currently_paused,
1110 subscriptions_to_add,
1111 backfill_nodes_to_pause,
1112 actor_cdc_table_snapshot_splits,
1113 new_upstream_sinks,
1114 };
1115
1116 Ok(Mutation::Add(add_mutation))
1117 }
1118 }
1119 }
1120
1121 pub(super) fn replace_stream_job_to_mutation(
1123 ReplaceStreamJobPlan {
1124 old_fragments,
1125 replace_upstream,
1126 upstream_fragment_downstreams,
1127 auto_refresh_schema_sinks,
1128 ..
1129 }: &ReplaceStreamJobPlan,
1130 edges: &mut FragmentEdgeBuildResult,
1131 database_info: &mut InflightDatabaseInfo,
1132 split_assignment: &SplitAssignment,
1133 ) -> MetaResult<Option<Mutation>> {
1134 {
1135 {
1136 let merge_updates = edges
1137 .merge_updates
1138 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1139 .collect();
1140 let dispatchers = edges
1141 .dispatchers
1142 .extract_if(|fragment_id, _| {
1143 upstream_fragment_downstreams.contains_key(fragment_id)
1144 })
1145 .collect();
1146 let actor_cdc_table_snapshot_splits = database_info
1147 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1148 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1149 let old_fragments = old_fragments.fragments.keys().copied();
1150 let auto_refresh_sink_fragment_ids = auto_refresh_schema_sinks
1151 .as_ref()
1152 .into_iter()
1153 .flat_map(|sinks| sinks.iter())
1154 .map(|sink| sink.original_fragment.fragment_id);
1155 Ok(Self::generate_update_mutation_for_replace_table(
1156 old_fragments
1157 .chain(auto_refresh_sink_fragment_ids)
1158 .flat_map(|fragment_id| {
1159 database_info.fragment(fragment_id).actors.keys().copied()
1160 }),
1161 merge_updates,
1162 dispatchers,
1163 split_assignment,
1164 actor_cdc_table_snapshot_splits,
1165 auto_refresh_schema_sinks.as_ref(),
1166 ))
1167 }
1168 }
1169 }
1170
1171 pub(super) fn reschedule_to_mutation(
1173 reschedules: &HashMap<FragmentId, Reschedule>,
1174 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1175 control_stream_manager: &ControlStreamManager,
1176 database_info: &mut InflightDatabaseInfo,
1177 ) -> MetaResult<Option<Mutation>> {
1178 {
1179 {
1180 let database_id = database_info.database_id;
1181 let mut dispatcher_update = HashMap::new();
1182 for reschedule in reschedules.values() {
1183 for &(upstream_fragment_id, dispatcher_id) in
1184 &reschedule.upstream_fragment_dispatcher_ids
1185 {
1186 let upstream_actor_ids = fragment_actors
1188 .get(&upstream_fragment_id)
1189 .expect("should contain");
1190
1191 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1192
1193 for &actor_id in upstream_actor_ids {
1195 let added_downstream_actor_id = if upstream_reschedule
1196 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1197 .unwrap_or(true)
1198 {
1199 reschedule
1200 .added_actors
1201 .values()
1202 .flatten()
1203 .cloned()
1204 .collect()
1205 } else {
1206 Default::default()
1207 };
1208 dispatcher_update
1210 .try_insert(
1211 (actor_id, dispatcher_id),
1212 DispatcherUpdate {
1213 actor_id,
1214 dispatcher_id,
1215 hash_mapping: reschedule
1216 .upstream_dispatcher_mapping
1217 .as_ref()
1218 .map(|m| m.to_protobuf()),
1219 added_downstream_actor_id,
1220 removed_downstream_actor_id: reschedule
1221 .removed_actors
1222 .iter()
1223 .cloned()
1224 .collect(),
1225 },
1226 )
1227 .unwrap();
1228 }
1229 }
1230 }
1231 let dispatcher_update = dispatcher_update.into_values().collect();
1232
1233 let mut merge_update = HashMap::new();
1234 for (&fragment_id, reschedule) in reschedules {
1235 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1236 let downstream_actor_ids = fragment_actors
1238 .get(&downstream_fragment_id)
1239 .expect("should contain");
1240
1241 let downstream_removed_actors: HashSet<_> = reschedules
1245 .get(&downstream_fragment_id)
1246 .map(|downstream_reschedule| {
1247 downstream_reschedule
1248 .removed_actors
1249 .iter()
1250 .copied()
1251 .collect()
1252 })
1253 .unwrap_or_default();
1254
1255 for &actor_id in downstream_actor_ids {
1257 if downstream_removed_actors.contains(&actor_id) {
1258 continue;
1259 }
1260
1261 merge_update
1263 .try_insert(
1264 (actor_id, fragment_id),
1265 MergeUpdate {
1266 actor_id,
1267 upstream_fragment_id: fragment_id,
1268 new_upstream_fragment_id: None,
1269 added_upstream_actors: reschedule
1270 .added_actors
1271 .iter()
1272 .flat_map(|(worker_id, actors)| {
1273 let host =
1274 control_stream_manager.host_addr(*worker_id);
1275 actors.iter().map(move |&actor_id| PbActorInfo {
1276 actor_id,
1277 host: Some(host.clone()),
1278 partial_graph_id: to_partial_graph_id(
1280 database_id,
1281 None,
1282 ),
1283 })
1284 })
1285 .collect(),
1286 removed_upstream_actor_id: reschedule
1287 .removed_actors
1288 .iter()
1289 .cloned()
1290 .collect(),
1291 },
1292 )
1293 .unwrap();
1294 }
1295 }
1296 }
1297 let merge_update = merge_update.into_values().collect();
1298
1299 let mut actor_vnode_bitmap_update = HashMap::new();
1300 for reschedule in reschedules.values() {
1301 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1303 let bitmap = bitmap.to_protobuf();
1304 actor_vnode_bitmap_update
1305 .try_insert(actor_id, bitmap)
1306 .unwrap();
1307 }
1308 }
1309 let dropped_actors = reschedules
1310 .values()
1311 .flat_map(|r| r.removed_actors.iter().copied())
1312 .collect();
1313 let mut actor_splits = HashMap::new();
1314 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1315 for (fragment_id, reschedule) in reschedules {
1316 for (actor_id, splits) in &reschedule.actor_splits {
1317 actor_splits.insert(
1318 *actor_id,
1319 ConnectorSplits {
1320 splits: splits.iter().map(ConnectorSplit::from).collect(),
1321 },
1322 );
1323 }
1324
1325 if let Some(assignment) =
1326 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1327 {
1328 actor_cdc_table_snapshot_splits.extend(assignment)
1329 }
1330 }
1331
1332 let actor_new_dispatchers = HashMap::new();
1334 let mutation = Mutation::Update(UpdateMutation {
1335 dispatcher_update,
1336 merge_update,
1337 actor_vnode_bitmap_update,
1338 dropped_actors,
1339 actor_splits,
1340 actor_new_dispatchers,
1341 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1342 splits: actor_cdc_table_snapshot_splits,
1343 }),
1344 sink_schema_change: Default::default(),
1345 subscriptions_to_drop: vec![],
1346 });
1347 tracing::debug!("update mutation: {mutation:?}");
1348 Ok(Some(mutation))
1349 }
1350 }
1351 }
1352
1353 pub(super) fn create_subscription_to_mutation(
1355 upstream_mv_table_id: TableId,
1356 subscription_id: SubscriptionId,
1357 ) -> Mutation {
1358 {
1359 Mutation::Add(AddMutation {
1360 actor_dispatchers: Default::default(),
1361 added_actors: vec![],
1362 actor_splits: Default::default(),
1363 pause: false,
1364 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1365 upstream_mv_table_id,
1366 subscriber_id: subscription_id.as_subscriber_id(),
1367 }],
1368 backfill_nodes_to_pause: vec![],
1369 actor_cdc_table_snapshot_splits: None,
1370 new_upstream_sinks: Default::default(),
1371 })
1372 }
1373 }
1374
1375 pub(super) fn drop_subscription_to_mutation(
1377 upstream_mv_table_id: TableId,
1378 subscription_id: SubscriptionId,
1379 ) -> Mutation {
1380 {
1381 Mutation::DropSubscriptions(DropSubscriptionsMutation {
1382 info: vec![SubscriptionUpstreamInfo {
1383 subscriber_id: subscription_id.as_subscriber_id(),
1384 upstream_mv_table_id,
1385 }],
1386 })
1387 }
1388 }
1389
1390 pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1392 {
1393 {
1394 let mut connector_props_infos = HashMap::default();
1395 for (k, v) in config {
1396 connector_props_infos.insert(
1397 k.as_raw_id(),
1398 ConnectorPropsInfo {
1399 connector_props_info: v.clone(),
1400 },
1401 );
1402 }
1403 Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1404 connector_props_infos,
1405 })
1406 }
1407 }
1408 }
1409
1410 pub(super) fn refresh_to_mutation(
1412 table_id: TableId,
1413 associated_source_id: SourceId,
1414 ) -> Mutation {
1415 Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1416 table_id,
1417 associated_source_id,
1418 })
1419 }
1420
1421 pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1423 Mutation::ListFinish(ListFinishMutation {
1424 associated_source_id,
1425 })
1426 }
1427
1428 pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1430 Mutation::LoadFinish(LoadFinishMutation {
1431 associated_source_id,
1432 })
1433 }
1434
1435 pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1437 Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1438 source_id: source_id.as_raw_id(),
1439 })
1440 }
1441
1442 pub(super) fn resume_backfill_to_mutation(
1444 target: &ResumeBackfillTarget,
1445 database_info: &InflightDatabaseInfo,
1446 ) -> MetaResult<Option<Mutation>> {
1447 {
1448 {
1449 let fragment_ids: HashSet<_> = match target {
1450 ResumeBackfillTarget::Job(job_id) => {
1451 database_info.backfill_fragment_ids_for_job(*job_id)?
1452 }
1453 ResumeBackfillTarget::Fragment(fragment_id) => {
1454 if !database_info.is_backfill_fragment(*fragment_id)? {
1455 return Err(MetaError::invalid_parameter(format!(
1456 "fragment {} is not a backfill node",
1457 fragment_id
1458 )));
1459 }
1460 HashSet::from([*fragment_id])
1461 }
1462 };
1463 if fragment_ids.is_empty() {
1464 warn!(
1465 ?target,
1466 "resume backfill command ignored because no backfill fragments found"
1467 );
1468 Ok(None)
1469 } else {
1470 Ok(Some(Mutation::StartFragmentBackfill(
1471 StartFragmentBackfillMutation {
1472 fragment_ids: fragment_ids.into_iter().collect(),
1473 },
1474 )))
1475 }
1476 }
1477 }
1478 }
1479
1480 pub(super) fn inject_source_offsets_to_mutation(
1482 source_id: SourceId,
1483 split_offsets: &HashMap<String, String>,
1484 ) -> Mutation {
1485 Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1486 source_id: source_id.as_raw_id(),
1487 split_offsets: split_offsets.clone(),
1488 })
1489 }
1490
1491 pub(super) fn create_streaming_job_actors_to_create(
1493 info: &CreateStreamingJobCommandInfo,
1494 edges: &mut FragmentEdgeBuildResult,
1495 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1496 actor_location: &HashMap<ActorId, WorkerId>,
1497 ) -> StreamJobActorsToCreate {
1498 {
1499 {
1500 edges.collect_actors_to_create(info.stream_job_fragments.fragments.values().map(
1501 |fragment| {
1502 let actors = stream_actors
1503 .get(&fragment.fragment_id)
1504 .into_iter()
1505 .flatten()
1506 .map(|actor| (actor, actor_location[&actor.actor_id]));
1507 (
1508 fragment.fragment_id,
1509 &fragment.nodes,
1510 actors,
1511 [], )
1513 },
1514 ))
1515 }
1516 }
1517 }
1518
1519 pub(super) fn reschedule_actors_to_create(
1521 reschedules: &HashMap<FragmentId, Reschedule>,
1522 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1523 database_info: &InflightDatabaseInfo,
1524 control_stream_manager: &ControlStreamManager,
1525 ) -> StreamJobActorsToCreate {
1526 {
1527 {
1528 let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1529 reschedules.iter().map(|(fragment_id, reschedule)| {
1530 (
1531 *fragment_id,
1532 reschedule.newly_created_actors.values().map(
1533 |((actor, dispatchers), _)| {
1534 (actor.actor_id, dispatchers.as_slice())
1535 },
1536 ),
1537 )
1538 }),
1539 Some((reschedules, fragment_actors)),
1540 database_info,
1541 control_stream_manager,
1542 );
1543 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1544 for (fragment_id, (actor, dispatchers), worker_id) in
1545 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1546 reschedule
1547 .newly_created_actors
1548 .values()
1549 .map(|(actors, status)| (*fragment_id, actors, status))
1550 })
1551 {
1552 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1553 map.entry(*worker_id)
1554 .or_default()
1555 .entry(fragment_id)
1556 .or_insert_with(|| {
1557 let node = database_info.fragment(fragment_id).nodes.clone();
1558 let subscribers =
1559 database_info.fragment_subscribers(fragment_id).collect();
1560 (node, vec![], subscribers)
1561 })
1562 .1
1563 .push((actor.clone(), upstreams, dispatchers.clone()));
1564 }
1565 map
1566 }
1567 }
1568 }
1569
1570 pub(super) fn replace_stream_job_actors_to_create(
1572 replace_table: &ReplaceStreamJobPlan,
1573 edges: &mut FragmentEdgeBuildResult,
1574 database_info: &InflightDatabaseInfo,
1575 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1576 actor_location: &HashMap<ActorId, WorkerId>,
1577 ) -> StreamJobActorsToCreate {
1578 {
1579 {
1580 let mut actors = edges.collect_actors_to_create(
1581 replace_table
1582 .new_fragments
1583 .fragments
1584 .values()
1585 .map(|fragment| {
1586 let actors = stream_actors
1587 .get(&fragment.fragment_id)
1588 .into_iter()
1589 .flatten()
1590 .map(|actor| (actor, actor_location[&actor.actor_id]));
1591 (
1592 fragment.fragment_id,
1593 &fragment.nodes,
1594 actors,
1595 database_info
1596 .job_subscribers(replace_table.old_fragments.stream_job_id),
1597 )
1598 }),
1599 );
1600
1601 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1603 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1604 (
1605 sink.new_fragment.fragment_id,
1606 &sink.new_fragment.nodes,
1607 stream_actors
1608 .get(&sink.new_fragment.fragment_id)
1609 .into_iter()
1610 .flatten()
1611 .map(|actor| (actor, actor_location[&actor.actor_id])),
1612 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1613 )
1614 }));
1615 for (worker_id, fragment_actors) in sink_actors {
1616 actors.entry(worker_id).or_default().extend(fragment_actors);
1617 }
1618 }
1619 actors
1620 }
1621 }
1622 }
1623
1624 fn generate_update_mutation_for_replace_table(
1625 dropped_actors: impl IntoIterator<Item = ActorId>,
1626 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1627 dispatchers: FragmentActorDispatchers,
1628 split_assignment: &SplitAssignment,
1629 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1630 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1631 ) -> Option<Mutation> {
1632 let dropped_actors = dropped_actors.into_iter().collect();
1633
1634 let actor_new_dispatchers = dispatchers
1635 .into_values()
1636 .flatten()
1637 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1638 .collect();
1639
1640 let actor_splits = split_assignment
1641 .values()
1642 .flat_map(build_actor_connector_splits)
1643 .collect();
1644 Some(Mutation::Update(UpdateMutation {
1645 actor_new_dispatchers,
1646 merge_update: merge_updates.into_values().flatten().collect(),
1647 dropped_actors,
1648 actor_splits,
1649 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1650 sink_schema_change: auto_refresh_schema_sinks
1651 .as_ref()
1652 .into_iter()
1653 .flat_map(|sinks| {
1654 sinks.iter().map(|sink| {
1655 (
1656 sink.original_sink.id.as_raw_id(),
1657 PbSinkSchemaChange {
1658 original_schema: sink
1659 .original_sink
1660 .columns
1661 .iter()
1662 .map(|col| PbField {
1663 data_type: Some(
1664 col.column_desc
1665 .as_ref()
1666 .unwrap()
1667 .column_type
1668 .as_ref()
1669 .unwrap()
1670 .clone(),
1671 ),
1672 name: col.column_desc.as_ref().unwrap().name.clone(),
1673 })
1674 .collect(),
1675 op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1676 fields: sink
1677 .newly_add_fields
1678 .iter()
1679 .map(|field| field.to_prost())
1680 .collect(),
1681 })),
1682 },
1683 )
1684 })
1685 })
1686 .collect(),
1687 ..Default::default()
1688 }))
1689 }
1690}
1691
1692impl Command {
1693 #[expect(clippy::type_complexity)]
1694 pub(super) fn collect_database_partial_graph_actor_upstreams(
1695 actor_dispatchers: impl Iterator<
1696 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1697 >,
1698 reschedule_dispatcher_update: Option<(
1699 &HashMap<FragmentId, Reschedule>,
1700 &HashMap<FragmentId, HashSet<ActorId>>,
1701 )>,
1702 database_info: &InflightDatabaseInfo,
1703 control_stream_manager: &ControlStreamManager,
1704 ) -> HashMap<ActorId, ActorUpstreams> {
1705 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1706 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1707 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1708 for (upstream_actor_id, dispatchers) in upstream_actors {
1709 let upstream_actor_location =
1710 upstream_fragment.actors[&upstream_actor_id].worker_id;
1711 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1712 for downstream_actor_id in dispatchers
1713 .iter()
1714 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1715 {
1716 actor_upstreams
1717 .entry(*downstream_actor_id)
1718 .or_default()
1719 .entry(upstream_fragment_id)
1720 .or_default()
1721 .insert(
1722 upstream_actor_id,
1723 PbActorInfo {
1724 actor_id: upstream_actor_id,
1725 host: Some(upstream_actor_host.clone()),
1726 partial_graph_id: to_partial_graph_id(
1727 database_info.database_id,
1728 None,
1729 ),
1730 },
1731 );
1732 }
1733 }
1734 }
1735 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1736 for reschedule in reschedules.values() {
1737 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1738 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1739 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1740 for upstream_actor_id in fragment_actors
1741 .get(upstream_fragment_id)
1742 .expect("should exist")
1743 {
1744 let upstream_actor_location =
1745 upstream_fragment.actors[upstream_actor_id].worker_id;
1746 let upstream_actor_host =
1747 control_stream_manager.host_addr(upstream_actor_location);
1748 if let Some(upstream_reschedule) = upstream_reschedule
1749 && upstream_reschedule
1750 .removed_actors
1751 .contains(upstream_actor_id)
1752 {
1753 continue;
1754 }
1755 for (_, downstream_actor_id) in
1756 reschedule
1757 .added_actors
1758 .iter()
1759 .flat_map(|(worker_id, actors)| {
1760 actors.iter().map(|actor| (*worker_id, *actor))
1761 })
1762 {
1763 actor_upstreams
1764 .entry(downstream_actor_id)
1765 .or_default()
1766 .entry(*upstream_fragment_id)
1767 .or_default()
1768 .insert(
1769 *upstream_actor_id,
1770 PbActorInfo {
1771 actor_id: *upstream_actor_id,
1772 host: Some(upstream_actor_host.clone()),
1773 partial_graph_id: to_partial_graph_id(
1774 database_info.database_id,
1775 None,
1776 ),
1777 },
1778 );
1779 }
1780 }
1781 }
1782 }
1783 }
1784 actor_upstreams
1785 }
1786}