1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::WorkerId;
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
37};
38use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
39use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
40use risingwave_pb::stream_plan::barrier_mutation::Mutation;
41use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
42use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
43use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
44use risingwave_pb::stream_plan::update_mutation::*;
45use risingwave_pb::stream_plan::{
46 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
47 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
48 PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
49 StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
50};
51use risingwave_pb::stream_service::BarrierCompleteResponse;
52use tracing::warn;
53
54use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
55use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
56use crate::barrier::cdc_progress::CdcTableBackfillTracker;
57use crate::barrier::edge_builder::FragmentEdgeBuildResult;
58use crate::barrier::info::BarrierInfo;
59use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
60use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
61use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
62use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
63use crate::manager::{StreamingJob, StreamingJobType};
64use crate::model::{
65 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
66 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
67 StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
68};
69use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
70use crate::stream::{
71 AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
72 SplitAssignment, SplitState, UpstreamSinkInfo, build_actor_connector_splits,
73};
74use crate::{MetaError, MetaResult};
75
76#[derive(Debug, Clone)]
79pub struct Reschedule {
80 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
82
83 pub removed_actors: HashSet<ActorId>,
85
86 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
88
89 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
91 pub upstream_dispatcher_mapping: Option<ActorMapping>,
96
97 pub downstream_fragment_ids: Vec<FragmentId>,
99
100 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
104
105 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
106}
107
108#[derive(Debug, Clone)]
115pub struct ReplaceStreamJobPlan {
116 pub old_fragments: StreamJobFragments,
117 pub new_fragments: StreamJobFragmentsToCreate,
118 pub replace_upstream: FragmentReplaceUpstream,
121 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
122 pub init_split_assignment: SplitAssignment,
128 pub streaming_job: StreamingJob,
130 pub tmp_id: JobId,
132 pub to_drop_state_table_ids: Vec<TableId>,
134 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
135}
136
137impl ReplaceStreamJobPlan {
138 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
139 let mut fragment_changes = HashMap::new();
140 for (fragment_id, new_fragment) in self
141 .new_fragments
142 .new_fragment_info(&self.init_split_assignment)
143 {
144 let fragment_change = CommandFragmentChanges::NewFragment {
145 job_id: self.streaming_job.id(),
146 info: new_fragment,
147 };
148 fragment_changes
149 .try_insert(fragment_id, fragment_change)
150 .expect("non-duplicate");
151 }
152 for fragment in self.old_fragments.fragments.values() {
153 fragment_changes
154 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
155 .expect("non-duplicate");
156 }
157 for (fragment_id, replace_map) in &self.replace_upstream {
158 fragment_changes
159 .try_insert(
160 *fragment_id,
161 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
162 )
163 .expect("non-duplicate");
164 }
165 if let Some(sinks) = &self.auto_refresh_schema_sinks {
166 for sink in sinks {
167 let fragment_change = CommandFragmentChanges::NewFragment {
168 job_id: sink.original_sink.id.as_job_id(),
169 info: sink.new_fragment_info(),
170 };
171 fragment_changes
172 .try_insert(sink.new_fragment.fragment_id, fragment_change)
173 .expect("non-duplicate");
174 fragment_changes
175 .try_insert(
176 sink.original_fragment.fragment_id,
177 CommandFragmentChanges::RemoveFragment,
178 )
179 .expect("non-duplicate");
180 }
181 }
182 fragment_changes
183 }
184
185 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
187 let mut fragment_replacements = HashMap::new();
188 for (upstream_fragment_id, new_upstream_fragment_id) in
189 self.replace_upstream.values().flatten()
190 {
191 {
192 let r =
193 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
194 if let Some(r) = r {
195 assert_eq!(
196 *new_upstream_fragment_id, r,
197 "one fragment is replaced by multiple fragments"
198 );
199 }
200 }
201 }
202 fragment_replacements
203 }
204}
205
206#[derive(educe::Educe, Clone)]
207#[educe(Debug)]
208pub struct CreateStreamingJobCommandInfo {
209 #[educe(Debug(ignore))]
210 pub stream_job_fragments: StreamJobFragmentsToCreate,
211 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
212 pub init_split_assignment: SplitAssignment,
213 pub definition: String,
214 pub job_type: StreamingJobType,
215 pub create_type: CreateType,
216 pub streaming_job: StreamingJob,
217 pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
218 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
219 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
220 pub is_serverless: bool,
221}
222
223impl StreamJobFragments {
224 pub(super) fn new_fragment_info<'a>(
225 &'a self,
226 assignment: &'a SplitAssignment,
227 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
228 self.fragments.values().map(|fragment| {
229 let mut fragment_splits = assignment
230 .get(&fragment.fragment_id)
231 .cloned()
232 .unwrap_or_default();
233
234 (
235 fragment.fragment_id,
236 InflightFragmentInfo {
237 fragment_id: fragment.fragment_id,
238 distribution_type: fragment.distribution_type.into(),
239 fragment_type_mask: fragment.fragment_type_mask,
240 vnode_count: fragment.vnode_count(),
241 nodes: fragment.nodes.clone(),
242 actors: fragment
243 .actors
244 .iter()
245 .map(|actor| {
246 (
247 actor.actor_id,
248 InflightActorInfo {
249 worker_id: self
250 .actor_status
251 .get(&actor.actor_id)
252 .expect("should exist")
253 .worker_id(),
254 vnode_bitmap: actor.vnode_bitmap.clone(),
255 splits: fragment_splits
256 .remove(&actor.actor_id)
257 .unwrap_or_default(),
258 },
259 )
260 })
261 .collect(),
262 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
263 },
264 )
265 })
266 }
267}
268
269#[derive(Debug, Clone)]
270pub struct SnapshotBackfillInfo {
271 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
275}
276
277#[derive(Debug, Clone)]
278pub enum CreateStreamingJobType {
279 Normal,
280 SinkIntoTable(UpstreamSinkInfo),
281 SnapshotBackfill(SnapshotBackfillInfo),
282}
283
284#[derive(Debug)]
289pub enum Command {
290 Flush,
293
294 Pause,
297
298 Resume,
302
303 DropStreamingJobs {
311 streaming_job_ids: HashSet<JobId>,
312 actors: Vec<ActorId>,
313 unregistered_state_table_ids: HashSet<TableId>,
314 unregistered_fragment_ids: HashSet<FragmentId>,
315 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
317 },
318
319 CreateStreamingJob {
329 info: CreateStreamingJobCommandInfo,
330 job_type: CreateStreamingJobType,
331 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
332 },
333
334 RescheduleFragment {
340 reschedules: HashMap<FragmentId, Reschedule>,
341 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
343 },
344
345 ReplaceStreamJob(ReplaceStreamJobPlan),
352
353 SourceChangeSplit(SplitState),
356
357 Throttle {
360 jobs: HashSet<JobId>,
361 config: HashMap<FragmentId, ThrottleConfig>,
362 },
363
364 CreateSubscription {
367 subscription_id: SubscriptionId,
368 upstream_mv_table_id: TableId,
369 retention_second: u64,
370 },
371
372 DropSubscription {
376 subscription_id: SubscriptionId,
377 upstream_mv_table_id: TableId,
378 },
379
380 ConnectorPropsChange(ConnectorPropsChange),
381
382 Refresh {
385 table_id: TableId,
386 associated_source_id: SourceId,
387 },
388 ListFinish {
389 table_id: TableId,
390 associated_source_id: SourceId,
391 },
392 LoadFinish {
393 table_id: TableId,
394 associated_source_id: SourceId,
395 },
396
397 ResetSource {
400 source_id: SourceId,
401 },
402
403 ResumeBackfill {
406 target: ResumeBackfillTarget,
407 },
408}
409
410#[derive(Debug, Clone, Copy)]
411pub enum ResumeBackfillTarget {
412 Job(JobId),
413 Fragment(FragmentId),
414}
415
416impl std::fmt::Display for Command {
418 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
419 match self {
420 Command::Flush => write!(f, "Flush"),
421 Command::Pause => write!(f, "Pause"),
422 Command::Resume => write!(f, "Resume"),
423 Command::DropStreamingJobs {
424 streaming_job_ids, ..
425 } => {
426 write!(
427 f,
428 "DropStreamingJobs: {}",
429 streaming_job_ids.iter().sorted().join(", ")
430 )
431 }
432 Command::CreateStreamingJob { info, .. } => {
433 write!(f, "CreateStreamingJob: {}", info.streaming_job)
434 }
435 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
436 Command::ReplaceStreamJob(plan) => {
437 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
438 }
439 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
440 Command::Throttle { .. } => write!(f, "Throttle"),
441 Command::CreateSubscription {
442 subscription_id, ..
443 } => write!(f, "CreateSubscription: {subscription_id}"),
444 Command::DropSubscription {
445 subscription_id, ..
446 } => write!(f, "DropSubscription: {subscription_id}"),
447 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
448 Command::Refresh {
449 table_id,
450 associated_source_id,
451 } => write!(
452 f,
453 "Refresh: {} (source: {})",
454 table_id, associated_source_id
455 ),
456 Command::ListFinish {
457 table_id,
458 associated_source_id,
459 } => write!(
460 f,
461 "ListFinish: {} (source: {})",
462 table_id, associated_source_id
463 ),
464 Command::LoadFinish {
465 table_id,
466 associated_source_id,
467 } => write!(
468 f,
469 "LoadFinish: {} (source: {})",
470 table_id, associated_source_id
471 ),
472 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
473 Command::ResumeBackfill { target } => match target {
474 ResumeBackfillTarget::Job(job_id) => {
475 write!(f, "ResumeBackfill: job={job_id}")
476 }
477 ResumeBackfillTarget::Fragment(fragment_id) => {
478 write!(f, "ResumeBackfill: fragment={fragment_id}")
479 }
480 },
481 }
482 }
483}
484
485impl Command {
486 pub fn pause() -> Self {
487 Self::Pause
488 }
489
490 pub fn resume() -> Self {
491 Self::Resume
492 }
493
494 #[expect(clippy::type_complexity)]
495 pub(super) fn fragment_changes(
496 &self,
497 ) -> Option<(
498 Option<(JobId, Option<CdcTableBackfillTracker>)>,
499 HashMap<FragmentId, CommandFragmentChanges>,
500 )> {
501 match self {
502 Command::Flush => None,
503 Command::Pause => None,
504 Command::Resume => None,
505 Command::DropStreamingJobs {
506 unregistered_fragment_ids,
507 dropped_sink_fragment_by_targets,
508 ..
509 } => {
510 let changes = unregistered_fragment_ids
511 .iter()
512 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
513 .chain(dropped_sink_fragment_by_targets.iter().map(
514 |(target_fragment, sink_fragments)| {
515 (
516 *target_fragment,
517 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
518 )
519 },
520 ))
521 .collect();
522
523 Some((None, changes))
524 }
525 Command::CreateStreamingJob { info, job_type, .. } => {
526 assert!(
527 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
528 "should handle fragment changes separately for snapshot backfill"
529 );
530 let mut changes: HashMap<_, _> = info
531 .stream_job_fragments
532 .new_fragment_info(&info.init_split_assignment)
533 .map(|(fragment_id, fragment_infos)| {
534 (
535 fragment_id,
536 CommandFragmentChanges::NewFragment {
537 job_id: info.streaming_job.id(),
538 info: fragment_infos,
539 },
540 )
541 })
542 .collect();
543
544 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
545 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
546 changes.insert(
547 downstream_fragment_id,
548 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
549 upstream_fragment_id: ctx.sink_fragment_id,
550 sink_output_schema: ctx.sink_output_fields.clone(),
551 project_exprs: ctx.project_exprs.clone(),
552 }),
553 );
554 }
555
556 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
557 let (fragment, _) =
558 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
559 .expect("should have parallel cdc fragment");
560 Some(CdcTableBackfillTracker::new(
561 fragment.fragment_id,
562 splits.clone(),
563 ))
564 } else {
565 None
566 };
567
568 Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
569 }
570 Command::RescheduleFragment { reschedules, .. } => Some((
571 None,
572 reschedules
573 .iter()
574 .map(|(fragment_id, reschedule)| {
575 (
576 *fragment_id,
577 CommandFragmentChanges::Reschedule {
578 new_actors: reschedule
579 .added_actors
580 .iter()
581 .flat_map(|(node_id, actors)| {
582 actors.iter().map(|actor_id| {
583 (
584 *actor_id,
585 InflightActorInfo {
586 worker_id: *node_id,
587 vnode_bitmap: reschedule
588 .newly_created_actors
589 .get(actor_id)
590 .expect("should exist")
591 .0
592 .0
593 .vnode_bitmap
594 .clone(),
595 splits: reschedule
596 .actor_splits
597 .get(actor_id)
598 .cloned()
599 .unwrap_or_default(),
600 },
601 )
602 })
603 })
604 .collect(),
605 actor_update_vnode_bitmap: reschedule
606 .vnode_bitmap_updates
607 .iter()
608 .filter(|(actor_id, _)| {
609 !reschedule.newly_created_actors.contains_key(actor_id)
611 })
612 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
613 .collect(),
614 to_remove: reschedule.removed_actors.iter().cloned().collect(),
615 actor_splits: reschedule.actor_splits.clone(),
616 },
617 )
618 })
619 .collect(),
620 )),
621 Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
622 Command::SourceChangeSplit(SplitState {
623 split_assignment, ..
624 }) => Some((
625 None,
626 split_assignment
627 .iter()
628 .map(|(&fragment_id, splits)| {
629 (
630 fragment_id,
631 CommandFragmentChanges::SplitAssignment {
632 actor_splits: splits.clone(),
633 },
634 )
635 })
636 .collect(),
637 )),
638 Command::Throttle { .. } => None,
639 Command::CreateSubscription { .. } => None,
640 Command::DropSubscription { .. } => None,
641 Command::ConnectorPropsChange(_) => None,
642 Command::Refresh { .. } => None, Command::ListFinish { .. } => None, Command::LoadFinish { .. } => None, Command::ResetSource { .. } => None, Command::ResumeBackfill { .. } => None, }
648 }
649
650 pub fn need_checkpoint(&self) -> bool {
651 !matches!(self, Command::Resume)
653 }
654}
655
656#[derive(Debug)]
657pub enum PostCollectCommand {
658 Command(String),
659 DropStreamingJobs {
660 streaming_job_ids: HashSet<JobId>,
661 unregistered_state_table_ids: HashSet<TableId>,
662 },
663 CreateStreamingJob {
664 info: CreateStreamingJobCommandInfo,
665 job_type: CreateStreamingJobType,
666 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
667 },
668 RescheduleFragment {
669 reschedules: HashMap<FragmentId, Reschedule>,
670 },
671 ReplaceStreamJob(ReplaceStreamJobPlan),
672 SourceChangeSplit {
673 split_assignment: SplitAssignment,
674 },
675 CreateSubscription {
676 subscription_id: SubscriptionId,
677 },
678 ConnectorPropsChange(ConnectorPropsChange),
679 ResumeBackfill {
680 target: ResumeBackfillTarget,
681 },
682}
683
684impl PostCollectCommand {
685 pub fn barrier() -> Self {
686 PostCollectCommand::Command("barrier".to_owned())
687 }
688
689 pub fn command_name(&self) -> &str {
690 match self {
691 PostCollectCommand::Command(name) => name.as_str(),
692 PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
693 PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
694 PostCollectCommand::RescheduleFragment { .. } => "RescheduleFragment",
695 PostCollectCommand::ReplaceStreamJob(_) => "ReplaceStreamJob",
696 PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
697 PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
698 PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
699 PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
700 }
701 }
702}
703
704impl Display for PostCollectCommand {
705 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
706 f.write_str(self.command_name())
707 }
708}
709
710impl Command {
711 pub(super) fn into_post_collect(self) -> PostCollectCommand {
712 match self {
713 Command::DropStreamingJobs {
714 streaming_job_ids,
715 unregistered_state_table_ids,
716 ..
717 } => PostCollectCommand::DropStreamingJobs {
718 streaming_job_ids,
719 unregistered_state_table_ids,
720 },
721 Command::CreateStreamingJob {
722 info,
723 job_type,
724 cross_db_snapshot_backfill_info,
725 } => match job_type {
726 CreateStreamingJobType::SnapshotBackfill(_) => PostCollectCommand::barrier(),
727 job_type => PostCollectCommand::CreateStreamingJob {
728 info,
729 job_type,
730 cross_db_snapshot_backfill_info,
731 },
732 },
733 Command::RescheduleFragment { reschedules, .. } => {
734 PostCollectCommand::RescheduleFragment { reschedules }
735 }
736 Command::ReplaceStreamJob(plan) => PostCollectCommand::ReplaceStreamJob(plan),
737 Command::SourceChangeSplit(SplitState { split_assignment }) => {
738 PostCollectCommand::SourceChangeSplit { split_assignment }
739 }
740 Command::CreateSubscription {
741 subscription_id, ..
742 } => PostCollectCommand::CreateSubscription { subscription_id },
743 Command::ConnectorPropsChange(connector_props_change) => {
744 PostCollectCommand::ConnectorPropsChange(connector_props_change)
745 }
746 Command::Flush => PostCollectCommand::Command("Flush".to_owned()),
747 Command::Pause => PostCollectCommand::Command("Pause".to_owned()),
748 Command::Resume => PostCollectCommand::Command("Resume".to_owned()),
749 Command::Throttle { .. } => PostCollectCommand::Command("Throttle".to_owned()),
750 Command::DropSubscription { .. } => {
751 PostCollectCommand::Command("DropSubscription".to_owned())
752 }
753 Command::Refresh { .. } => PostCollectCommand::Command("Refresh".to_owned()),
754 Command::ListFinish { .. } => PostCollectCommand::Command("ListFinish".to_owned()),
755 Command::LoadFinish { .. } => PostCollectCommand::Command("LoadFinish".to_owned()),
756 Command::ResetSource { .. } => PostCollectCommand::Command("ResetSource".to_owned()),
757 Command::ResumeBackfill { target } => PostCollectCommand::ResumeBackfill { target },
758 }
759 }
760}
761
762#[derive(Debug, Clone)]
763pub enum BarrierKind {
764 Initial,
765 Barrier,
766 Checkpoint(Vec<u64>),
768}
769
770impl BarrierKind {
771 pub fn to_protobuf(&self) -> PbBarrierKind {
772 match self {
773 BarrierKind::Initial => PbBarrierKind::Initial,
774 BarrierKind::Barrier => PbBarrierKind::Barrier,
775 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
776 }
777 }
778
779 pub fn is_checkpoint(&self) -> bool {
780 matches!(self, BarrierKind::Checkpoint(_))
781 }
782
783 pub fn is_initial(&self) -> bool {
784 matches!(self, BarrierKind::Initial)
785 }
786
787 pub fn as_str_name(&self) -> &'static str {
788 match self {
789 BarrierKind::Initial => "Initial",
790 BarrierKind::Barrier => "Barrier",
791 BarrierKind::Checkpoint(_) => "Checkpoint",
792 }
793 }
794}
795
796pub(super) struct CommandContext {
799 mv_subscription_max_retention: HashMap<TableId, u64>,
800
801 pub(super) barrier_info: BarrierInfo,
802
803 pub(super) table_ids_to_commit: HashSet<TableId>,
804
805 pub(super) command: PostCollectCommand,
806
807 _span: tracing::Span,
813}
814
815impl std::fmt::Debug for CommandContext {
816 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
817 f.debug_struct("CommandContext")
818 .field("barrier_info", &self.barrier_info)
819 .field("command", &self.command.command_name())
820 .finish()
821 }
822}
823
824impl CommandContext {
825 pub(super) fn new(
826 barrier_info: BarrierInfo,
827 mv_subscription_max_retention: HashMap<TableId, u64>,
828 table_ids_to_commit: HashSet<TableId>,
829 command: PostCollectCommand,
830 span: tracing::Span,
831 ) -> Self {
832 Self {
833 mv_subscription_max_retention,
834 barrier_info,
835 table_ids_to_commit,
836 command,
837 _span: span,
838 }
839 }
840
841 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
842 let Some(truncate_timestamptz) = Timestamptz::from_secs(
843 self.barrier_info
844 .prev_epoch
845 .value()
846 .as_timestamptz()
847 .timestamp()
848 - retention_second as i64,
849 ) else {
850 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
851 return self.barrier_info.prev_epoch.value();
852 };
853 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
854 }
855
856 pub(super) fn collect_commit_epoch_info(
857 &self,
858 info: &mut CommitEpochInfo,
859 resps: Vec<BarrierCompleteResponse>,
860 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
861 ) {
862 let (
863 sst_to_context,
864 synced_ssts,
865 new_table_watermarks,
866 old_value_ssts,
867 vector_index_adds,
868 truncate_tables,
869 ) = collect_resp_info(resps);
870
871 let new_table_fragment_infos =
872 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
873 assert!(!matches!(
874 job_type,
875 CreateStreamingJobType::SnapshotBackfill(_)
876 ));
877 let table_fragments = &info.stream_job_fragments;
878 let mut table_ids: HashSet<_> =
879 table_fragments.internal_table_ids().into_iter().collect();
880 if let Some(mv_table_id) = table_fragments.mv_table_id() {
881 table_ids.insert(mv_table_id);
882 }
883
884 vec![NewTableFragmentInfo { table_ids }]
885 } else {
886 vec![]
887 };
888
889 let mut mv_log_store_truncate_epoch = HashMap::new();
890 let mut update_truncate_epoch =
892 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
893 Entry::Occupied(mut entry) => {
894 let prev_truncate_epoch = entry.get_mut();
895 if truncate_epoch < *prev_truncate_epoch {
896 *prev_truncate_epoch = truncate_epoch;
897 }
898 }
899 Entry::Vacant(entry) => {
900 entry.insert(truncate_epoch);
901 }
902 };
903 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
904 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
905 update_truncate_epoch(*mv_table_id, truncate_epoch);
906 }
907 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
908 for mv_table_id in upstream_mv_table_ids {
909 update_truncate_epoch(mv_table_id, backfill_epoch);
910 }
911 }
912
913 let table_new_change_log = build_table_change_log_delta(
914 old_value_ssts.into_iter(),
915 synced_ssts.iter().map(|sst| &sst.sst_info),
916 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
917 mv_log_store_truncate_epoch.into_iter(),
918 );
919
920 let epoch = self.barrier_info.prev_epoch();
921 for table_id in &self.table_ids_to_commit {
922 info.tables_to_commit
923 .try_insert(*table_id, epoch)
924 .expect("non duplicate");
925 }
926
927 info.sstables.extend(synced_ssts);
928 info.new_table_watermarks.extend(new_table_watermarks);
929 info.sst_to_context.extend(sst_to_context);
930 info.new_table_fragment_infos
931 .extend(new_table_fragment_infos);
932 info.change_log_delta.extend(table_new_change_log);
933 for (table_id, vector_index_adds) in vector_index_adds {
934 info.vector_index_delta
935 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
936 .expect("non-duplicate");
937 }
938 if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
939 && let Some(index_table) = collect_new_vector_index_info(job_info)
940 {
941 info.vector_index_delta
942 .try_insert(
943 index_table.id,
944 VectorIndexDelta::Init(PbVectorIndexInit {
945 info: Some(index_table.vector_index_info.unwrap()),
946 }),
947 )
948 .expect("non-duplicate");
949 }
950 info.truncate_tables.extend(truncate_tables);
951 }
952}
953
954impl Command {
955 pub(super) fn to_mutation(
959 &self,
960 is_currently_paused: bool,
961 edges: &mut Option<FragmentEdgeBuildResult>,
962 control_stream_manager: &ControlStreamManager,
963 database_info: &mut InflightDatabaseInfo,
964 ) -> MetaResult<Option<Mutation>> {
965 let database_id = database_info.database_id;
966 let mutation = match self {
967 Command::Flush => None,
968
969 Command::Pause => {
970 if !is_currently_paused {
973 Some(Mutation::Pause(PauseMutation {}))
974 } else {
975 None
976 }
977 }
978
979 Command::Resume => {
980 if is_currently_paused {
982 Some(Mutation::Resume(ResumeMutation {}))
983 } else {
984 None
985 }
986 }
987
988 Command::SourceChangeSplit(SplitState {
989 split_assignment, ..
990 }) => {
991 let mut diff = HashMap::new();
992
993 for actor_splits in split_assignment.values() {
994 diff.extend(actor_splits.clone());
995 }
996
997 Some(Mutation::Splits(SourceChangeSplitMutation {
998 actor_splits: build_actor_connector_splits(&diff),
999 }))
1000 }
1001
1002 Command::Throttle { config, .. } => {
1003 let config = config.clone();
1004 Some(Mutation::Throttle(ThrottleMutation {
1005 fragment_throttle: config,
1006 }))
1007 }
1008
1009 Command::DropStreamingJobs {
1010 actors,
1011 dropped_sink_fragment_by_targets,
1012 ..
1013 } => Some(Mutation::Stop(StopMutation {
1014 actors: actors.clone(),
1015 dropped_sink_fragments: dropped_sink_fragment_by_targets
1016 .values()
1017 .flatten()
1018 .cloned()
1019 .collect(),
1020 })),
1021
1022 Command::CreateStreamingJob {
1023 info:
1024 CreateStreamingJobCommandInfo {
1025 stream_job_fragments,
1026 init_split_assignment: split_assignment,
1027 upstream_fragment_downstreams,
1028 fragment_backfill_ordering,
1029 ..
1030 },
1031 job_type,
1032 ..
1033 } => {
1034 let edges = edges.as_mut().expect("should exist");
1035 let added_actors = stream_job_fragments.actor_ids().collect();
1036 let actor_splits = split_assignment
1037 .values()
1038 .flat_map(build_actor_connector_splits)
1039 .collect();
1040 let subscriptions_to_add =
1041 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1042 job_type
1043 {
1044 snapshot_backfill_info
1045 .upstream_mv_table_id_to_backfill_epoch
1046 .keys()
1047 .map(|table_id| SubscriptionUpstreamInfo {
1048 subscriber_id: stream_job_fragments
1049 .stream_job_id()
1050 .as_subscriber_id(),
1051 upstream_mv_table_id: *table_id,
1052 })
1053 .collect()
1054 } else {
1055 Default::default()
1056 };
1057 let backfill_nodes_to_pause: Vec<_> =
1058 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1059 .into_iter()
1060 .collect();
1061
1062 let new_upstream_sinks =
1063 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1064 sink_fragment_id,
1065 sink_output_fields,
1066 project_exprs,
1067 new_sink_downstream,
1068 ..
1069 }) = job_type
1070 {
1071 let new_sink_actors = stream_job_fragments
1072 .actors_to_create()
1073 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
1074 .exactly_one()
1075 .map(|(_, _, actors)| {
1076 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
1077 actor_id: actor.actor_id,
1078 host: Some(control_stream_manager.host_addr(worker_id)),
1079 partial_graph_id: to_partial_graph_id(database_id, None),
1080 })
1081 })
1082 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
1083 let new_upstream_sink = PbNewUpstreamSink {
1084 info: Some(PbUpstreamSinkInfo {
1085 upstream_fragment_id: *sink_fragment_id,
1086 sink_output_schema: sink_output_fields.clone(),
1087 project_exprs: project_exprs.clone(),
1088 }),
1089 upstream_actors: new_sink_actors.collect(),
1090 };
1091 HashMap::from([(
1092 new_sink_downstream.downstream_fragment_id,
1093 new_upstream_sink,
1094 )])
1095 } else {
1096 HashMap::new()
1097 };
1098
1099 let actor_cdc_table_snapshot_splits =
1100 if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
1101 database_info
1102 .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
1103 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
1104 } else {
1105 None
1106 };
1107
1108 let add_mutation = AddMutation {
1109 actor_dispatchers: edges
1110 .dispatchers
1111 .extract_if(|fragment_id, _| {
1112 upstream_fragment_downstreams.contains_key(fragment_id)
1113 })
1114 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1115 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1116 .collect(),
1117 added_actors,
1118 actor_splits,
1119 pause: is_currently_paused,
1121 subscriptions_to_add,
1122 backfill_nodes_to_pause,
1123 actor_cdc_table_snapshot_splits,
1124 new_upstream_sinks,
1125 };
1126
1127 Some(Mutation::Add(add_mutation))
1128 }
1129
1130 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1131 old_fragments,
1132 replace_upstream,
1133 upstream_fragment_downstreams,
1134 init_split_assignment,
1135 auto_refresh_schema_sinks,
1136 ..
1137 }) => {
1138 let edges = edges.as_mut().expect("should exist");
1139 let merge_updates = edges
1140 .merge_updates
1141 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1142 .collect();
1143 let dispatchers = edges
1144 .dispatchers
1145 .extract_if(|fragment_id, _| {
1146 upstream_fragment_downstreams.contains_key(fragment_id)
1147 })
1148 .collect();
1149 let actor_cdc_table_snapshot_splits = database_info
1150 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1151 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1152 Self::generate_update_mutation_for_replace_table(
1153 old_fragments.actor_ids().chain(
1154 auto_refresh_schema_sinks
1155 .as_ref()
1156 .into_iter()
1157 .flat_map(|sinks| {
1158 sinks.iter().flat_map(|sink| {
1159 sink.original_fragment
1160 .actors
1161 .iter()
1162 .map(|actor| actor.actor_id)
1163 })
1164 }),
1165 ),
1166 merge_updates,
1167 dispatchers,
1168 init_split_assignment,
1169 actor_cdc_table_snapshot_splits,
1170 auto_refresh_schema_sinks.as_ref(),
1171 )
1172 }
1173
1174 Command::RescheduleFragment {
1175 reschedules,
1176 fragment_actors,
1177 ..
1178 } => {
1179 let mut dispatcher_update = HashMap::new();
1180 for reschedule in reschedules.values() {
1181 for &(upstream_fragment_id, dispatcher_id) in
1182 &reschedule.upstream_fragment_dispatcher_ids
1183 {
1184 let upstream_actor_ids = fragment_actors
1186 .get(&upstream_fragment_id)
1187 .expect("should contain");
1188
1189 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1190
1191 for &actor_id in upstream_actor_ids {
1193 let added_downstream_actor_id = if upstream_reschedule
1194 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1195 .unwrap_or(true)
1196 {
1197 reschedule
1198 .added_actors
1199 .values()
1200 .flatten()
1201 .cloned()
1202 .collect()
1203 } else {
1204 Default::default()
1205 };
1206 dispatcher_update
1208 .try_insert(
1209 (actor_id, dispatcher_id),
1210 DispatcherUpdate {
1211 actor_id,
1212 dispatcher_id,
1213 hash_mapping: reschedule
1214 .upstream_dispatcher_mapping
1215 .as_ref()
1216 .map(|m| m.to_protobuf()),
1217 added_downstream_actor_id,
1218 removed_downstream_actor_id: reschedule
1219 .removed_actors
1220 .iter()
1221 .cloned()
1222 .collect(),
1223 },
1224 )
1225 .unwrap();
1226 }
1227 }
1228 }
1229 let dispatcher_update = dispatcher_update.into_values().collect();
1230
1231 let mut merge_update = HashMap::new();
1232 for (&fragment_id, reschedule) in reschedules {
1233 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1234 let downstream_actor_ids = fragment_actors
1236 .get(&downstream_fragment_id)
1237 .expect("should contain");
1238
1239 let downstream_removed_actors: HashSet<_> = reschedules
1243 .get(&downstream_fragment_id)
1244 .map(|downstream_reschedule| {
1245 downstream_reschedule
1246 .removed_actors
1247 .iter()
1248 .copied()
1249 .collect()
1250 })
1251 .unwrap_or_default();
1252
1253 for &actor_id in downstream_actor_ids {
1255 if downstream_removed_actors.contains(&actor_id) {
1256 continue;
1257 }
1258
1259 merge_update
1261 .try_insert(
1262 (actor_id, fragment_id),
1263 MergeUpdate {
1264 actor_id,
1265 upstream_fragment_id: fragment_id,
1266 new_upstream_fragment_id: None,
1267 added_upstream_actors: reschedule
1268 .added_actors
1269 .iter()
1270 .flat_map(|(worker_id, actors)| {
1271 let host =
1272 control_stream_manager.host_addr(*worker_id);
1273 actors.iter().map(move |&actor_id| PbActorInfo {
1274 actor_id,
1275 host: Some(host.clone()),
1276 partial_graph_id: to_partial_graph_id(
1278 database_id,
1279 None,
1280 ),
1281 })
1282 })
1283 .collect(),
1284 removed_upstream_actor_id: reschedule
1285 .removed_actors
1286 .iter()
1287 .cloned()
1288 .collect(),
1289 },
1290 )
1291 .unwrap();
1292 }
1293 }
1294 }
1295 let merge_update = merge_update.into_values().collect();
1296
1297 let mut actor_vnode_bitmap_update = HashMap::new();
1298 for reschedule in reschedules.values() {
1299 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1301 let bitmap = bitmap.to_protobuf();
1302 actor_vnode_bitmap_update
1303 .try_insert(actor_id, bitmap)
1304 .unwrap();
1305 }
1306 }
1307 let dropped_actors = reschedules
1308 .values()
1309 .flat_map(|r| r.removed_actors.iter().copied())
1310 .collect();
1311 let mut actor_splits = HashMap::new();
1312 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1313 for (fragment_id, reschedule) in reschedules {
1314 for (actor_id, splits) in &reschedule.actor_splits {
1315 actor_splits.insert(
1316 *actor_id,
1317 ConnectorSplits {
1318 splits: splits.iter().map(ConnectorSplit::from).collect(),
1319 },
1320 );
1321 }
1322
1323 if let Some(assignment) =
1324 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1325 {
1326 actor_cdc_table_snapshot_splits.extend(assignment)
1327 }
1328 }
1329
1330 let actor_new_dispatchers = HashMap::new();
1332 let mutation = Mutation::Update(UpdateMutation {
1333 dispatcher_update,
1334 merge_update,
1335 actor_vnode_bitmap_update,
1336 dropped_actors,
1337 actor_splits,
1338 actor_new_dispatchers,
1339 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1340 splits: actor_cdc_table_snapshot_splits,
1341 }),
1342 sink_schema_change: Default::default(),
1343 subscriptions_to_drop: vec![],
1344 });
1345 tracing::debug!("update mutation: {mutation:?}");
1346 Some(mutation)
1347 }
1348
1349 Command::CreateSubscription {
1350 upstream_mv_table_id,
1351 subscription_id,
1352 ..
1353 } => Some(Mutation::Add(AddMutation {
1354 actor_dispatchers: Default::default(),
1355 added_actors: vec![],
1356 actor_splits: Default::default(),
1357 pause: false,
1358 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1359 upstream_mv_table_id: *upstream_mv_table_id,
1360 subscriber_id: subscription_id.as_subscriber_id(),
1361 }],
1362 backfill_nodes_to_pause: vec![],
1363 actor_cdc_table_snapshot_splits: None,
1364 new_upstream_sinks: Default::default(),
1365 })),
1366 Command::DropSubscription {
1367 upstream_mv_table_id,
1368 subscription_id,
1369 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1370 info: vec![SubscriptionUpstreamInfo {
1371 subscriber_id: subscription_id.as_subscriber_id(),
1372 upstream_mv_table_id: *upstream_mv_table_id,
1373 }],
1374 })),
1375 Command::ConnectorPropsChange(config) => {
1376 let mut connector_props_infos = HashMap::default();
1377 for (k, v) in config {
1378 connector_props_infos.insert(
1379 k.as_raw_id(),
1380 ConnectorPropsInfo {
1381 connector_props_info: v.clone(),
1382 },
1383 );
1384 }
1385 Some(Mutation::ConnectorPropsChange(
1386 ConnectorPropsChangeMutation {
1387 connector_props_infos,
1388 },
1389 ))
1390 }
1391 Command::Refresh {
1392 table_id,
1393 associated_source_id,
1394 } => Some(Mutation::RefreshStart(
1395 risingwave_pb::stream_plan::RefreshStartMutation {
1396 table_id: *table_id,
1397 associated_source_id: *associated_source_id,
1398 },
1399 )),
1400 Command::ListFinish {
1401 table_id: _,
1402 associated_source_id,
1403 } => Some(Mutation::ListFinish(ListFinishMutation {
1404 associated_source_id: *associated_source_id,
1405 })),
1406 Command::LoadFinish {
1407 table_id: _,
1408 associated_source_id,
1409 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1410 associated_source_id: *associated_source_id,
1411 })),
1412 Command::ResetSource { source_id } => Some(Mutation::ResetSource(
1413 risingwave_pb::stream_plan::ResetSourceMutation {
1414 source_id: source_id.as_raw_id(),
1415 },
1416 )),
1417 Command::ResumeBackfill { target } => {
1418 let fragment_ids: HashSet<_> = match target {
1419 ResumeBackfillTarget::Job(job_id) => {
1420 database_info.backfill_fragment_ids_for_job(*job_id)?
1421 }
1422 ResumeBackfillTarget::Fragment(fragment_id) => {
1423 if !database_info.is_backfill_fragment(*fragment_id)? {
1424 return Err(MetaError::invalid_parameter(format!(
1425 "fragment {} is not a backfill node",
1426 fragment_id
1427 )));
1428 }
1429 HashSet::from([*fragment_id])
1430 }
1431 };
1432 if fragment_ids.is_empty() {
1433 warn!(
1434 ?target,
1435 "resume backfill command ignored because no backfill fragments found"
1436 );
1437 None
1438 } else {
1439 Some(Mutation::StartFragmentBackfill(
1440 StartFragmentBackfillMutation {
1441 fragment_ids: fragment_ids.into_iter().collect(),
1442 },
1443 ))
1444 }
1445 }
1446 };
1447 Ok(mutation)
1448 }
1449
1450 pub(super) fn actors_to_create(
1451 &self,
1452 database_info: &InflightDatabaseInfo,
1453 edges: &mut Option<FragmentEdgeBuildResult>,
1454 control_stream_manager: &ControlStreamManager,
1455 ) -> Option<StreamJobActorsToCreate> {
1456 match self {
1457 Command::CreateStreamingJob { info, job_type, .. } => {
1458 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1459 return None;
1461 }
1462 let actors_to_create = info.stream_job_fragments.actors_to_create();
1463 let edges = edges.as_mut().expect("should exist");
1464 Some(edges.collect_actors_to_create(actors_to_create.map(
1465 |(fragment_id, node, actors)| {
1466 (
1467 fragment_id,
1468 node,
1469 actors,
1470 [], )
1472 },
1473 )))
1474 }
1475 Command::RescheduleFragment {
1476 reschedules,
1477 fragment_actors,
1478 ..
1479 } => {
1480 let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1482 reschedules.iter().map(|(fragment_id, reschedule)| {
1483 (
1484 *fragment_id,
1485 reschedule.newly_created_actors.values().map(
1486 |((actor, dispatchers), _)| {
1487 (actor.actor_id, dispatchers.as_slice())
1488 },
1489 ),
1490 )
1491 }),
1492 Some((reschedules, fragment_actors)),
1493 database_info,
1494 control_stream_manager,
1495 );
1496 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1497 for (fragment_id, (actor, dispatchers), worker_id) in
1498 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1499 reschedule
1500 .newly_created_actors
1501 .values()
1502 .map(|(actors, status)| (*fragment_id, actors, status))
1503 })
1504 {
1505 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1506 map.entry(*worker_id)
1507 .or_default()
1508 .entry(fragment_id)
1509 .or_insert_with(|| {
1510 let node = database_info.fragment(fragment_id).nodes.clone();
1511 let subscribers =
1512 database_info.fragment_subscribers(fragment_id).collect();
1513 (node, vec![], subscribers)
1514 })
1515 .1
1516 .push((actor.clone(), upstreams, dispatchers.clone()));
1517 }
1518 Some(map)
1519 }
1520 Command::ReplaceStreamJob(replace_table) => {
1521 let edges = edges.as_mut().expect("should exist");
1522 let mut actors = edges.collect_actors_to_create(
1523 replace_table.new_fragments.actors_to_create().map(
1524 |(fragment_id, node, actors)| {
1525 (
1526 fragment_id,
1527 node,
1528 actors,
1529 database_info
1530 .job_subscribers(replace_table.old_fragments.stream_job_id),
1531 )
1532 },
1533 ),
1534 );
1535 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1536 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1537 (
1538 sink.new_fragment.fragment_id,
1539 &sink.new_fragment.nodes,
1540 sink.new_fragment.actors.iter().map(|actor| {
1541 (
1542 actor,
1543 sink.actor_status[&actor.actor_id]
1544 .location
1545 .as_ref()
1546 .unwrap()
1547 .worker_node_id,
1548 )
1549 }),
1550 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1551 )
1552 }));
1553 for (worker_id, fragment_actors) in sink_actors {
1554 actors.entry(worker_id).or_default().extend(fragment_actors);
1555 }
1556 }
1557 Some(actors)
1558 }
1559 _ => None,
1560 }
1561 }
1562
1563 fn generate_update_mutation_for_replace_table(
1564 dropped_actors: impl IntoIterator<Item = ActorId>,
1565 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1566 dispatchers: FragmentActorDispatchers,
1567 init_split_assignment: &SplitAssignment,
1568 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1569 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1570 ) -> Option<Mutation> {
1571 let dropped_actors = dropped_actors.into_iter().collect();
1572
1573 let actor_new_dispatchers = dispatchers
1574 .into_values()
1575 .flatten()
1576 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1577 .collect();
1578
1579 let actor_splits = init_split_assignment
1580 .values()
1581 .flat_map(build_actor_connector_splits)
1582 .collect();
1583 Some(Mutation::Update(UpdateMutation {
1584 actor_new_dispatchers,
1585 merge_update: merge_updates.into_values().flatten().collect(),
1586 dropped_actors,
1587 actor_splits,
1588 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1589 sink_schema_change: auto_refresh_schema_sinks
1590 .as_ref()
1591 .into_iter()
1592 .flat_map(|sinks| {
1593 sinks.iter().map(|sink| {
1594 (
1595 sink.original_sink.id.as_raw_id(),
1596 PbSinkSchemaChange {
1597 original_schema: sink
1598 .original_sink
1599 .columns
1600 .iter()
1601 .map(|col| PbField {
1602 data_type: Some(
1603 col.column_desc
1604 .as_ref()
1605 .unwrap()
1606 .column_type
1607 .as_ref()
1608 .unwrap()
1609 .clone(),
1610 ),
1611 name: col.column_desc.as_ref().unwrap().name.clone(),
1612 })
1613 .collect(),
1614 op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1615 fields: sink
1616 .newly_add_fields
1617 .iter()
1618 .map(|field| field.to_prost())
1619 .collect(),
1620 })),
1621 },
1622 )
1623 })
1624 })
1625 .collect(),
1626 ..Default::default()
1627 }))
1628 }
1629}
1630
1631impl Command {
1632 #[expect(clippy::type_complexity)]
1633 pub(super) fn collect_database_partial_graph_actor_upstreams(
1634 actor_dispatchers: impl Iterator<
1635 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1636 >,
1637 reschedule_dispatcher_update: Option<(
1638 &HashMap<FragmentId, Reschedule>,
1639 &HashMap<FragmentId, HashSet<ActorId>>,
1640 )>,
1641 database_info: &InflightDatabaseInfo,
1642 control_stream_manager: &ControlStreamManager,
1643 ) -> HashMap<ActorId, ActorUpstreams> {
1644 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1645 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1646 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1647 for (upstream_actor_id, dispatchers) in upstream_actors {
1648 let upstream_actor_location =
1649 upstream_fragment.actors[&upstream_actor_id].worker_id;
1650 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1651 for downstream_actor_id in dispatchers
1652 .iter()
1653 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1654 {
1655 actor_upstreams
1656 .entry(*downstream_actor_id)
1657 .or_default()
1658 .entry(upstream_fragment_id)
1659 .or_default()
1660 .insert(
1661 upstream_actor_id,
1662 PbActorInfo {
1663 actor_id: upstream_actor_id,
1664 host: Some(upstream_actor_host.clone()),
1665 partial_graph_id: to_partial_graph_id(
1666 database_info.database_id,
1667 None,
1668 ),
1669 },
1670 );
1671 }
1672 }
1673 }
1674 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1675 for reschedule in reschedules.values() {
1676 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1677 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1678 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1679 for upstream_actor_id in fragment_actors
1680 .get(upstream_fragment_id)
1681 .expect("should exist")
1682 {
1683 let upstream_actor_location =
1684 upstream_fragment.actors[upstream_actor_id].worker_id;
1685 let upstream_actor_host =
1686 control_stream_manager.host_addr(upstream_actor_location);
1687 if let Some(upstream_reschedule) = upstream_reschedule
1688 && upstream_reschedule
1689 .removed_actors
1690 .contains(upstream_actor_id)
1691 {
1692 continue;
1693 }
1694 for (_, downstream_actor_id) in
1695 reschedule
1696 .added_actors
1697 .iter()
1698 .flat_map(|(worker_id, actors)| {
1699 actors.iter().map(|actor| (*worker_id, *actor))
1700 })
1701 {
1702 actor_upstreams
1703 .entry(downstream_actor_id)
1704 .or_default()
1705 .entry(*upstream_fragment_id)
1706 .or_default()
1707 .insert(
1708 *upstream_actor_id,
1709 PbActorInfo {
1710 actor_id: *upstream_actor_id,
1711 host: Some(upstream_actor_host.clone()),
1712 partial_graph_id: to_partial_graph_id(
1713 database_info.database_id,
1714 None,
1715 ),
1716 },
1717 );
1718 }
1719 }
1720 }
1721 }
1722 }
1723 actor_upstreams
1724 }
1725}