1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::WorkerId;
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
37};
38use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
39use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
40use risingwave_pb::stream_plan::barrier_mutation::Mutation;
41use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
42use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
43use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
44use risingwave_pb::stream_plan::update_mutation::*;
45use risingwave_pb::stream_plan::{
46 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
47 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
48 PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StopMutation,
49 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
50};
51use risingwave_pb::stream_service::BarrierCompleteResponse;
52use tracing::warn;
53
54use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
55use crate::MetaResult;
56use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
57use crate::barrier::cdc_progress::CdcTableBackfillTracker;
58use crate::barrier::edge_builder::FragmentEdgeBuildResult;
59use crate::barrier::info::BarrierInfo;
60use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
61use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
62use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
63use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
64use crate::manager::{StreamingJob, StreamingJobType};
65use crate::model::{
66 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
67 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
68 StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
69};
70use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
71use crate::stream::{
72 AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
73 SplitState, UpstreamSinkInfo, build_actor_connector_splits,
74};
75
76#[derive(Debug, Clone)]
79pub struct Reschedule {
80 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
82
83 pub removed_actors: HashSet<ActorId>,
85
86 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
88
89 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
91 pub upstream_dispatcher_mapping: Option<ActorMapping>,
96
97 pub downstream_fragment_ids: Vec<FragmentId>,
99
100 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
104
105 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
106}
107
108#[derive(Debug, Clone)]
115pub struct ReplaceStreamJobPlan {
116 pub old_fragments: StreamJobFragments,
117 pub new_fragments: StreamJobFragmentsToCreate,
118 pub replace_upstream: FragmentReplaceUpstream,
121 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
122 pub init_split_assignment: SplitAssignment,
128 pub streaming_job: StreamingJob,
130 pub tmp_id: JobId,
132 pub to_drop_state_table_ids: Vec<TableId>,
134 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
135}
136
137impl ReplaceStreamJobPlan {
138 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
139 let mut fragment_changes = HashMap::new();
140 for (fragment_id, new_fragment) in self
141 .new_fragments
142 .new_fragment_info(&self.init_split_assignment)
143 {
144 let fragment_change = CommandFragmentChanges::NewFragment {
145 job_id: self.streaming_job.id(),
146 info: new_fragment,
147 };
148 fragment_changes
149 .try_insert(fragment_id, fragment_change)
150 .expect("non-duplicate");
151 }
152 for fragment in self.old_fragments.fragments.values() {
153 fragment_changes
154 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
155 .expect("non-duplicate");
156 }
157 for (fragment_id, replace_map) in &self.replace_upstream {
158 fragment_changes
159 .try_insert(
160 *fragment_id,
161 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
162 )
163 .expect("non-duplicate");
164 }
165 if let Some(sinks) = &self.auto_refresh_schema_sinks {
166 for sink in sinks {
167 let fragment_change = CommandFragmentChanges::NewFragment {
168 job_id: sink.original_sink.id.as_job_id(),
169 info: sink.new_fragment_info(),
170 };
171 fragment_changes
172 .try_insert(sink.new_fragment.fragment_id, fragment_change)
173 .expect("non-duplicate");
174 fragment_changes
175 .try_insert(
176 sink.original_fragment.fragment_id,
177 CommandFragmentChanges::RemoveFragment,
178 )
179 .expect("non-duplicate");
180 }
181 }
182 fragment_changes
183 }
184
185 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
187 let mut fragment_replacements = HashMap::new();
188 for (upstream_fragment_id, new_upstream_fragment_id) in
189 self.replace_upstream.values().flatten()
190 {
191 {
192 let r =
193 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
194 if let Some(r) = r {
195 assert_eq!(
196 *new_upstream_fragment_id, r,
197 "one fragment is replaced by multiple fragments"
198 );
199 }
200 }
201 }
202 fragment_replacements
203 }
204}
205
206#[derive(educe::Educe, Clone)]
207#[educe(Debug)]
208pub struct CreateStreamingJobCommandInfo {
209 #[educe(Debug(ignore))]
210 pub stream_job_fragments: StreamJobFragmentsToCreate,
211 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
212 pub init_split_assignment: SplitAssignment,
213 pub definition: String,
214 pub job_type: StreamingJobType,
215 pub create_type: CreateType,
216 pub streaming_job: StreamingJob,
217 pub fragment_backfill_ordering: FragmentBackfillOrder,
218 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
219 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
220 pub is_serverless: bool,
221}
222
223impl StreamJobFragments {
224 pub(super) fn new_fragment_info<'a>(
225 &'a self,
226 assignment: &'a SplitAssignment,
227 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
228 self.fragments.values().map(|fragment| {
229 let mut fragment_splits = assignment
230 .get(&fragment.fragment_id)
231 .cloned()
232 .unwrap_or_default();
233
234 (
235 fragment.fragment_id,
236 InflightFragmentInfo {
237 fragment_id: fragment.fragment_id,
238 distribution_type: fragment.distribution_type.into(),
239 fragment_type_mask: fragment.fragment_type_mask,
240 vnode_count: fragment.vnode_count(),
241 nodes: fragment.nodes.clone(),
242 actors: fragment
243 .actors
244 .iter()
245 .map(|actor| {
246 (
247 actor.actor_id,
248 InflightActorInfo {
249 worker_id: self
250 .actor_status
251 .get(&actor.actor_id)
252 .expect("should exist")
253 .worker_id(),
254 vnode_bitmap: actor.vnode_bitmap.clone(),
255 splits: fragment_splits
256 .remove(&actor.actor_id)
257 .unwrap_or_default(),
258 },
259 )
260 })
261 .collect(),
262 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
263 },
264 )
265 })
266 }
267}
268
269#[derive(Debug, Clone)]
270pub struct SnapshotBackfillInfo {
271 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
275}
276
277#[derive(Debug, Clone)]
278pub enum CreateStreamingJobType {
279 Normal,
280 SinkIntoTable(UpstreamSinkInfo),
281 SnapshotBackfill(SnapshotBackfillInfo),
282}
283
284#[derive(Debug)]
289pub enum Command {
290 Flush,
293
294 Pause,
297
298 Resume,
302
303 DropStreamingJobs {
311 streaming_job_ids: HashSet<JobId>,
312 actors: Vec<ActorId>,
313 unregistered_state_table_ids: HashSet<TableId>,
314 unregistered_fragment_ids: HashSet<FragmentId>,
315 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
317 },
318
319 CreateStreamingJob {
329 info: CreateStreamingJobCommandInfo,
330 job_type: CreateStreamingJobType,
331 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
332 },
333
334 RescheduleFragment {
340 reschedules: HashMap<FragmentId, Reschedule>,
341 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
343 },
344
345 ReplaceStreamJob(ReplaceStreamJobPlan),
352
353 SourceChangeSplit(SplitState),
356
357 Throttle {
360 jobs: HashSet<JobId>,
361 config: HashMap<FragmentId, ThrottleConfig>,
362 },
363
364 CreateSubscription {
367 subscription_id: SubscriptionId,
368 upstream_mv_table_id: TableId,
369 retention_second: u64,
370 },
371
372 DropSubscription {
376 subscription_id: SubscriptionId,
377 upstream_mv_table_id: TableId,
378 },
379
380 ConnectorPropsChange(ConnectorPropsChange),
381
382 Refresh {
385 table_id: TableId,
386 associated_source_id: SourceId,
387 },
388 ListFinish {
389 table_id: TableId,
390 associated_source_id: SourceId,
391 },
392 LoadFinish {
393 table_id: TableId,
394 associated_source_id: SourceId,
395 },
396
397 ResetSource {
400 source_id: SourceId,
401 },
402}
403
404impl std::fmt::Display for Command {
406 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
407 match self {
408 Command::Flush => write!(f, "Flush"),
409 Command::Pause => write!(f, "Pause"),
410 Command::Resume => write!(f, "Resume"),
411 Command::DropStreamingJobs {
412 streaming_job_ids, ..
413 } => {
414 write!(
415 f,
416 "DropStreamingJobs: {}",
417 streaming_job_ids.iter().sorted().join(", ")
418 )
419 }
420 Command::CreateStreamingJob { info, .. } => {
421 write!(f, "CreateStreamingJob: {}", info.streaming_job)
422 }
423 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
424 Command::ReplaceStreamJob(plan) => {
425 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
426 }
427 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
428 Command::Throttle { .. } => write!(f, "Throttle"),
429 Command::CreateSubscription {
430 subscription_id, ..
431 } => write!(f, "CreateSubscription: {subscription_id}"),
432 Command::DropSubscription {
433 subscription_id, ..
434 } => write!(f, "DropSubscription: {subscription_id}"),
435 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
436 Command::Refresh {
437 table_id,
438 associated_source_id,
439 } => write!(
440 f,
441 "Refresh: {} (source: {})",
442 table_id, associated_source_id
443 ),
444 Command::ListFinish {
445 table_id,
446 associated_source_id,
447 } => write!(
448 f,
449 "ListFinish: {} (source: {})",
450 table_id, associated_source_id
451 ),
452 Command::LoadFinish {
453 table_id,
454 associated_source_id,
455 } => write!(
456 f,
457 "LoadFinish: {} (source: {})",
458 table_id, associated_source_id
459 ),
460 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
461 }
462 }
463}
464
465impl Command {
466 pub fn pause() -> Self {
467 Self::Pause
468 }
469
470 pub fn resume() -> Self {
471 Self::Resume
472 }
473
474 #[expect(clippy::type_complexity)]
475 pub(super) fn fragment_changes(
476 &self,
477 ) -> Option<(
478 Option<(JobId, Option<CdcTableBackfillTracker>)>,
479 HashMap<FragmentId, CommandFragmentChanges>,
480 )> {
481 match self {
482 Command::Flush => None,
483 Command::Pause => None,
484 Command::Resume => None,
485 Command::DropStreamingJobs {
486 unregistered_fragment_ids,
487 dropped_sink_fragment_by_targets,
488 ..
489 } => {
490 let changes = unregistered_fragment_ids
491 .iter()
492 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
493 .chain(dropped_sink_fragment_by_targets.iter().map(
494 |(target_fragment, sink_fragments)| {
495 (
496 *target_fragment,
497 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
498 )
499 },
500 ))
501 .collect();
502
503 Some((None, changes))
504 }
505 Command::CreateStreamingJob { info, job_type, .. } => {
506 assert!(
507 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
508 "should handle fragment changes separately for snapshot backfill"
509 );
510 let mut changes: HashMap<_, _> = info
511 .stream_job_fragments
512 .new_fragment_info(&info.init_split_assignment)
513 .map(|(fragment_id, fragment_infos)| {
514 (
515 fragment_id,
516 CommandFragmentChanges::NewFragment {
517 job_id: info.streaming_job.id(),
518 info: fragment_infos,
519 },
520 )
521 })
522 .collect();
523
524 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
525 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
526 changes.insert(
527 downstream_fragment_id,
528 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
529 upstream_fragment_id: ctx.sink_fragment_id,
530 sink_output_schema: ctx.sink_output_fields.clone(),
531 project_exprs: ctx.project_exprs.clone(),
532 }),
533 );
534 }
535
536 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
537 let (fragment, _) =
538 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
539 .expect("should have parallel cdc fragment");
540 Some(CdcTableBackfillTracker::new(
541 fragment.fragment_id,
542 splits.clone(),
543 ))
544 } else {
545 None
546 };
547
548 Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
549 }
550 Command::RescheduleFragment { reschedules, .. } => Some((
551 None,
552 reschedules
553 .iter()
554 .map(|(fragment_id, reschedule)| {
555 (
556 *fragment_id,
557 CommandFragmentChanges::Reschedule {
558 new_actors: reschedule
559 .added_actors
560 .iter()
561 .flat_map(|(node_id, actors)| {
562 actors.iter().map(|actor_id| {
563 (
564 *actor_id,
565 InflightActorInfo {
566 worker_id: *node_id,
567 vnode_bitmap: reschedule
568 .newly_created_actors
569 .get(actor_id)
570 .expect("should exist")
571 .0
572 .0
573 .vnode_bitmap
574 .clone(),
575 splits: reschedule
576 .actor_splits
577 .get(actor_id)
578 .cloned()
579 .unwrap_or_default(),
580 },
581 )
582 })
583 })
584 .collect(),
585 actor_update_vnode_bitmap: reschedule
586 .vnode_bitmap_updates
587 .iter()
588 .filter(|(actor_id, _)| {
589 !reschedule.newly_created_actors.contains_key(actor_id)
591 })
592 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
593 .collect(),
594 to_remove: reschedule.removed_actors.iter().cloned().collect(),
595 actor_splits: reschedule.actor_splits.clone(),
596 },
597 )
598 })
599 .collect(),
600 )),
601 Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
602 Command::SourceChangeSplit(SplitState {
603 split_assignment, ..
604 }) => Some((
605 None,
606 split_assignment
607 .iter()
608 .map(|(&fragment_id, splits)| {
609 (
610 fragment_id,
611 CommandFragmentChanges::SplitAssignment {
612 actor_splits: splits.clone(),
613 },
614 )
615 })
616 .collect(),
617 )),
618 Command::Throttle { .. } => None,
619 Command::CreateSubscription { .. } => None,
620 Command::DropSubscription { .. } => None,
621 Command::ConnectorPropsChange(_) => None,
622 Command::Refresh { .. } => None, Command::ListFinish { .. } => None, Command::LoadFinish { .. } => None, Command::ResetSource { .. } => None, }
627 }
628
629 pub fn need_checkpoint(&self) -> bool {
630 !matches!(self, Command::Resume)
632 }
633}
634
635#[derive(Debug)]
636pub enum PostCollectCommand {
637 Command(String),
638 DropStreamingJobs {
639 streaming_job_ids: HashSet<JobId>,
640 unregistered_state_table_ids: HashSet<TableId>,
641 },
642 CreateStreamingJob {
643 info: CreateStreamingJobCommandInfo,
644 job_type: CreateStreamingJobType,
645 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
646 },
647 RescheduleFragment {
648 reschedules: HashMap<FragmentId, Reschedule>,
649 },
650 ReplaceStreamJob(ReplaceStreamJobPlan),
651 SourceChangeSplit {
652 split_assignment: SplitAssignment,
653 },
654 CreateSubscription {
655 subscription_id: SubscriptionId,
656 },
657 ConnectorPropsChange(ConnectorPropsChange),
658}
659
660impl PostCollectCommand {
661 pub fn barrier() -> Self {
662 PostCollectCommand::Command("barrier".to_owned())
663 }
664
665 pub fn command_name(&self) -> &str {
666 match self {
667 PostCollectCommand::Command(name) => name.as_str(),
668 PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
669 PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
670 PostCollectCommand::RescheduleFragment { .. } => "RescheduleFragment",
671 PostCollectCommand::ReplaceStreamJob(_) => "ReplaceStreamJob",
672 PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
673 PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
674 PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
675 }
676 }
677}
678
679impl Display for PostCollectCommand {
680 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
681 f.write_str(self.command_name())
682 }
683}
684
685impl Command {
686 pub(super) fn into_post_collect(self) -> PostCollectCommand {
687 match self {
688 Command::DropStreamingJobs {
689 streaming_job_ids,
690 unregistered_state_table_ids,
691 ..
692 } => PostCollectCommand::DropStreamingJobs {
693 streaming_job_ids,
694 unregistered_state_table_ids,
695 },
696 Command::CreateStreamingJob {
697 info,
698 job_type,
699 cross_db_snapshot_backfill_info,
700 } => match job_type {
701 CreateStreamingJobType::SnapshotBackfill(_) => PostCollectCommand::barrier(),
702 job_type => PostCollectCommand::CreateStreamingJob {
703 info,
704 job_type,
705 cross_db_snapshot_backfill_info,
706 },
707 },
708 Command::RescheduleFragment { reschedules, .. } => {
709 PostCollectCommand::RescheduleFragment { reschedules }
710 }
711 Command::ReplaceStreamJob(plan) => PostCollectCommand::ReplaceStreamJob(plan),
712 Command::SourceChangeSplit(SplitState { split_assignment }) => {
713 PostCollectCommand::SourceChangeSplit { split_assignment }
714 }
715 Command::CreateSubscription {
716 subscription_id, ..
717 } => PostCollectCommand::CreateSubscription { subscription_id },
718 Command::ConnectorPropsChange(connector_props_change) => {
719 PostCollectCommand::ConnectorPropsChange(connector_props_change)
720 }
721 Command::Flush => PostCollectCommand::Command("Flush".to_owned()),
722 Command::Pause => PostCollectCommand::Command("Pause".to_owned()),
723 Command::Resume => PostCollectCommand::Command("Resume".to_owned()),
724 Command::Throttle { .. } => PostCollectCommand::Command("Throttle".to_owned()),
725 Command::DropSubscription { .. } => {
726 PostCollectCommand::Command("DropSubscription".to_owned())
727 }
728 Command::Refresh { .. } => PostCollectCommand::Command("Refresh".to_owned()),
729 Command::ListFinish { .. } => PostCollectCommand::Command("ListFinish".to_owned()),
730 Command::LoadFinish { .. } => PostCollectCommand::Command("LoadFinish".to_owned()),
731 Command::ResetSource { .. } => PostCollectCommand::Command("ResetSource".to_owned()),
732 }
733 }
734}
735
736#[derive(Debug, Clone)]
737pub enum BarrierKind {
738 Initial,
739 Barrier,
740 Checkpoint(Vec<u64>),
742}
743
744impl BarrierKind {
745 pub fn to_protobuf(&self) -> PbBarrierKind {
746 match self {
747 BarrierKind::Initial => PbBarrierKind::Initial,
748 BarrierKind::Barrier => PbBarrierKind::Barrier,
749 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
750 }
751 }
752
753 pub fn is_checkpoint(&self) -> bool {
754 matches!(self, BarrierKind::Checkpoint(_))
755 }
756
757 pub fn is_initial(&self) -> bool {
758 matches!(self, BarrierKind::Initial)
759 }
760
761 pub fn as_str_name(&self) -> &'static str {
762 match self {
763 BarrierKind::Initial => "Initial",
764 BarrierKind::Barrier => "Barrier",
765 BarrierKind::Checkpoint(_) => "Checkpoint",
766 }
767 }
768}
769
770pub(super) struct CommandContext {
773 mv_subscription_max_retention: HashMap<TableId, u64>,
774
775 pub(super) barrier_info: BarrierInfo,
776
777 pub(super) table_ids_to_commit: HashSet<TableId>,
778
779 pub(super) command: PostCollectCommand,
780
781 _span: tracing::Span,
787}
788
789impl std::fmt::Debug for CommandContext {
790 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
791 f.debug_struct("CommandContext")
792 .field("barrier_info", &self.barrier_info)
793 .field("command", &self.command.command_name())
794 .finish()
795 }
796}
797
798impl CommandContext {
799 pub(super) fn new(
800 barrier_info: BarrierInfo,
801 mv_subscription_max_retention: HashMap<TableId, u64>,
802 table_ids_to_commit: HashSet<TableId>,
803 command: PostCollectCommand,
804 span: tracing::Span,
805 ) -> Self {
806 Self {
807 mv_subscription_max_retention,
808 barrier_info,
809 table_ids_to_commit,
810 command,
811 _span: span,
812 }
813 }
814
815 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
816 let Some(truncate_timestamptz) = Timestamptz::from_secs(
817 self.barrier_info
818 .prev_epoch
819 .value()
820 .as_timestamptz()
821 .timestamp()
822 - retention_second as i64,
823 ) else {
824 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
825 return self.barrier_info.prev_epoch.value();
826 };
827 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
828 }
829
830 pub(super) fn collect_commit_epoch_info(
831 &self,
832 info: &mut CommitEpochInfo,
833 resps: Vec<BarrierCompleteResponse>,
834 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
835 ) {
836 let (
837 sst_to_context,
838 synced_ssts,
839 new_table_watermarks,
840 old_value_ssts,
841 vector_index_adds,
842 truncate_tables,
843 ) = collect_resp_info(resps);
844
845 let new_table_fragment_infos =
846 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
847 assert!(!matches!(
848 job_type,
849 CreateStreamingJobType::SnapshotBackfill(_)
850 ));
851 let table_fragments = &info.stream_job_fragments;
852 let mut table_ids: HashSet<_> =
853 table_fragments.internal_table_ids().into_iter().collect();
854 if let Some(mv_table_id) = table_fragments.mv_table_id() {
855 table_ids.insert(mv_table_id);
856 }
857
858 vec![NewTableFragmentInfo { table_ids }]
859 } else {
860 vec![]
861 };
862
863 let mut mv_log_store_truncate_epoch = HashMap::new();
864 let mut update_truncate_epoch =
866 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
867 Entry::Occupied(mut entry) => {
868 let prev_truncate_epoch = entry.get_mut();
869 if truncate_epoch < *prev_truncate_epoch {
870 *prev_truncate_epoch = truncate_epoch;
871 }
872 }
873 Entry::Vacant(entry) => {
874 entry.insert(truncate_epoch);
875 }
876 };
877 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
878 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
879 update_truncate_epoch(*mv_table_id, truncate_epoch);
880 }
881 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
882 for mv_table_id in upstream_mv_table_ids {
883 update_truncate_epoch(mv_table_id, backfill_epoch);
884 }
885 }
886
887 let table_new_change_log = build_table_change_log_delta(
888 old_value_ssts.into_iter(),
889 synced_ssts.iter().map(|sst| &sst.sst_info),
890 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
891 mv_log_store_truncate_epoch.into_iter(),
892 );
893
894 let epoch = self.barrier_info.prev_epoch();
895 for table_id in &self.table_ids_to_commit {
896 info.tables_to_commit
897 .try_insert(*table_id, epoch)
898 .expect("non duplicate");
899 }
900
901 info.sstables.extend(synced_ssts);
902 info.new_table_watermarks.extend(new_table_watermarks);
903 info.sst_to_context.extend(sst_to_context);
904 info.new_table_fragment_infos
905 .extend(new_table_fragment_infos);
906 info.change_log_delta.extend(table_new_change_log);
907 for (table_id, vector_index_adds) in vector_index_adds {
908 info.vector_index_delta
909 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
910 .expect("non-duplicate");
911 }
912 if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
913 && let Some(index_table) = collect_new_vector_index_info(job_info)
914 {
915 info.vector_index_delta
916 .try_insert(
917 index_table.id,
918 VectorIndexDelta::Init(PbVectorIndexInit {
919 info: Some(index_table.vector_index_info.unwrap()),
920 }),
921 )
922 .expect("non-duplicate");
923 }
924 info.truncate_tables.extend(truncate_tables);
925 }
926}
927
928impl Command {
929 pub(super) fn to_mutation(
933 &self,
934 is_currently_paused: bool,
935 edges: &mut Option<FragmentEdgeBuildResult>,
936 control_stream_manager: &ControlStreamManager,
937 database_info: &mut InflightDatabaseInfo,
938 ) -> MetaResult<Option<Mutation>> {
939 let database_id = database_info.database_id;
940 let mutation = match self {
941 Command::Flush => None,
942
943 Command::Pause => {
944 if !is_currently_paused {
947 Some(Mutation::Pause(PauseMutation {}))
948 } else {
949 None
950 }
951 }
952
953 Command::Resume => {
954 if is_currently_paused {
956 Some(Mutation::Resume(ResumeMutation {}))
957 } else {
958 None
959 }
960 }
961
962 Command::SourceChangeSplit(SplitState {
963 split_assignment, ..
964 }) => {
965 let mut diff = HashMap::new();
966
967 for actor_splits in split_assignment.values() {
968 diff.extend(actor_splits.clone());
969 }
970
971 Some(Mutation::Splits(SourceChangeSplitMutation {
972 actor_splits: build_actor_connector_splits(&diff),
973 }))
974 }
975
976 Command::Throttle { config, .. } => {
977 let config = config.clone();
978 Some(Mutation::Throttle(ThrottleMutation {
979 fragment_throttle: config,
980 }))
981 }
982
983 Command::DropStreamingJobs {
984 actors,
985 dropped_sink_fragment_by_targets,
986 ..
987 } => Some(Mutation::Stop(StopMutation {
988 actors: actors.clone(),
989 dropped_sink_fragments: dropped_sink_fragment_by_targets
990 .values()
991 .flatten()
992 .cloned()
993 .collect(),
994 })),
995
996 Command::CreateStreamingJob {
997 info:
998 CreateStreamingJobCommandInfo {
999 stream_job_fragments,
1000 init_split_assignment: split_assignment,
1001 upstream_fragment_downstreams,
1002 fragment_backfill_ordering,
1003 ..
1004 },
1005 job_type,
1006 ..
1007 } => {
1008 let edges = edges.as_mut().expect("should exist");
1009 let added_actors = stream_job_fragments.actor_ids().collect();
1010 let actor_splits = split_assignment
1011 .values()
1012 .flat_map(build_actor_connector_splits)
1013 .collect();
1014 let subscriptions_to_add =
1015 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1016 job_type
1017 {
1018 snapshot_backfill_info
1019 .upstream_mv_table_id_to_backfill_epoch
1020 .keys()
1021 .map(|table_id| SubscriptionUpstreamInfo {
1022 subscriber_id: stream_job_fragments
1023 .stream_job_id()
1024 .as_subscriber_id(),
1025 upstream_mv_table_id: *table_id,
1026 })
1027 .collect()
1028 } else {
1029 Default::default()
1030 };
1031 let backfill_nodes_to_pause: Vec<_> =
1032 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1033 .into_iter()
1034 .collect();
1035
1036 let new_upstream_sinks =
1037 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1038 sink_fragment_id,
1039 sink_output_fields,
1040 project_exprs,
1041 new_sink_downstream,
1042 ..
1043 }) = job_type
1044 {
1045 let new_sink_actors = stream_job_fragments
1046 .actors_to_create()
1047 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
1048 .exactly_one()
1049 .map(|(_, _, actors)| {
1050 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
1051 actor_id: actor.actor_id,
1052 host: Some(control_stream_manager.host_addr(worker_id)),
1053 partial_graph_id: to_partial_graph_id(database_id, None),
1054 })
1055 })
1056 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
1057 let new_upstream_sink = PbNewUpstreamSink {
1058 info: Some(PbUpstreamSinkInfo {
1059 upstream_fragment_id: *sink_fragment_id,
1060 sink_output_schema: sink_output_fields.clone(),
1061 project_exprs: project_exprs.clone(),
1062 }),
1063 upstream_actors: new_sink_actors.collect(),
1064 };
1065 HashMap::from([(
1066 new_sink_downstream.downstream_fragment_id,
1067 new_upstream_sink,
1068 )])
1069 } else {
1070 HashMap::new()
1071 };
1072
1073 let actor_cdc_table_snapshot_splits =
1074 if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
1075 database_info
1076 .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
1077 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
1078 } else {
1079 None
1080 };
1081
1082 let add_mutation = AddMutation {
1083 actor_dispatchers: edges
1084 .dispatchers
1085 .extract_if(|fragment_id, _| {
1086 upstream_fragment_downstreams.contains_key(fragment_id)
1087 })
1088 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1089 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1090 .collect(),
1091 added_actors,
1092 actor_splits,
1093 pause: is_currently_paused,
1095 subscriptions_to_add,
1096 backfill_nodes_to_pause,
1097 actor_cdc_table_snapshot_splits,
1098 new_upstream_sinks,
1099 };
1100
1101 Some(Mutation::Add(add_mutation))
1102 }
1103
1104 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1105 old_fragments,
1106 replace_upstream,
1107 upstream_fragment_downstreams,
1108 init_split_assignment,
1109 auto_refresh_schema_sinks,
1110 ..
1111 }) => {
1112 let edges = edges.as_mut().expect("should exist");
1113 let merge_updates = edges
1114 .merge_updates
1115 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1116 .collect();
1117 let dispatchers = edges
1118 .dispatchers
1119 .extract_if(|fragment_id, _| {
1120 upstream_fragment_downstreams.contains_key(fragment_id)
1121 })
1122 .collect();
1123 let actor_cdc_table_snapshot_splits = database_info
1124 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1125 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1126 Self::generate_update_mutation_for_replace_table(
1127 old_fragments.actor_ids().chain(
1128 auto_refresh_schema_sinks
1129 .as_ref()
1130 .into_iter()
1131 .flat_map(|sinks| {
1132 sinks.iter().flat_map(|sink| {
1133 sink.original_fragment
1134 .actors
1135 .iter()
1136 .map(|actor| actor.actor_id)
1137 })
1138 }),
1139 ),
1140 merge_updates,
1141 dispatchers,
1142 init_split_assignment,
1143 actor_cdc_table_snapshot_splits,
1144 auto_refresh_schema_sinks.as_ref(),
1145 )
1146 }
1147
1148 Command::RescheduleFragment {
1149 reschedules,
1150 fragment_actors,
1151 ..
1152 } => {
1153 let mut dispatcher_update = HashMap::new();
1154 for reschedule in reschedules.values() {
1155 for &(upstream_fragment_id, dispatcher_id) in
1156 &reschedule.upstream_fragment_dispatcher_ids
1157 {
1158 let upstream_actor_ids = fragment_actors
1160 .get(&upstream_fragment_id)
1161 .expect("should contain");
1162
1163 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1164
1165 for &actor_id in upstream_actor_ids {
1167 let added_downstream_actor_id = if upstream_reschedule
1168 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1169 .unwrap_or(true)
1170 {
1171 reschedule
1172 .added_actors
1173 .values()
1174 .flatten()
1175 .cloned()
1176 .collect()
1177 } else {
1178 Default::default()
1179 };
1180 dispatcher_update
1182 .try_insert(
1183 (actor_id, dispatcher_id),
1184 DispatcherUpdate {
1185 actor_id,
1186 dispatcher_id,
1187 hash_mapping: reschedule
1188 .upstream_dispatcher_mapping
1189 .as_ref()
1190 .map(|m| m.to_protobuf()),
1191 added_downstream_actor_id,
1192 removed_downstream_actor_id: reschedule
1193 .removed_actors
1194 .iter()
1195 .cloned()
1196 .collect(),
1197 },
1198 )
1199 .unwrap();
1200 }
1201 }
1202 }
1203 let dispatcher_update = dispatcher_update.into_values().collect();
1204
1205 let mut merge_update = HashMap::new();
1206 for (&fragment_id, reschedule) in reschedules {
1207 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1208 let downstream_actor_ids = fragment_actors
1210 .get(&downstream_fragment_id)
1211 .expect("should contain");
1212
1213 let downstream_removed_actors: HashSet<_> = reschedules
1217 .get(&downstream_fragment_id)
1218 .map(|downstream_reschedule| {
1219 downstream_reschedule
1220 .removed_actors
1221 .iter()
1222 .copied()
1223 .collect()
1224 })
1225 .unwrap_or_default();
1226
1227 for &actor_id in downstream_actor_ids {
1229 if downstream_removed_actors.contains(&actor_id) {
1230 continue;
1231 }
1232
1233 merge_update
1235 .try_insert(
1236 (actor_id, fragment_id),
1237 MergeUpdate {
1238 actor_id,
1239 upstream_fragment_id: fragment_id,
1240 new_upstream_fragment_id: None,
1241 added_upstream_actors: reschedule
1242 .added_actors
1243 .iter()
1244 .flat_map(|(worker_id, actors)| {
1245 let host =
1246 control_stream_manager.host_addr(*worker_id);
1247 actors.iter().map(move |&actor_id| PbActorInfo {
1248 actor_id,
1249 host: Some(host.clone()),
1250 partial_graph_id: to_partial_graph_id(
1252 database_id,
1253 None,
1254 ),
1255 })
1256 })
1257 .collect(),
1258 removed_upstream_actor_id: reschedule
1259 .removed_actors
1260 .iter()
1261 .cloned()
1262 .collect(),
1263 },
1264 )
1265 .unwrap();
1266 }
1267 }
1268 }
1269 let merge_update = merge_update.into_values().collect();
1270
1271 let mut actor_vnode_bitmap_update = HashMap::new();
1272 for reschedule in reschedules.values() {
1273 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1275 let bitmap = bitmap.to_protobuf();
1276 actor_vnode_bitmap_update
1277 .try_insert(actor_id, bitmap)
1278 .unwrap();
1279 }
1280 }
1281 let dropped_actors = reschedules
1282 .values()
1283 .flat_map(|r| r.removed_actors.iter().copied())
1284 .collect();
1285 let mut actor_splits = HashMap::new();
1286 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1287 for (fragment_id, reschedule) in reschedules {
1288 for (actor_id, splits) in &reschedule.actor_splits {
1289 actor_splits.insert(
1290 *actor_id,
1291 ConnectorSplits {
1292 splits: splits.iter().map(ConnectorSplit::from).collect(),
1293 },
1294 );
1295 }
1296
1297 if let Some(assignment) =
1298 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1299 {
1300 actor_cdc_table_snapshot_splits.extend(assignment)
1301 }
1302 }
1303
1304 let actor_new_dispatchers = HashMap::new();
1306 let mutation = Mutation::Update(UpdateMutation {
1307 dispatcher_update,
1308 merge_update,
1309 actor_vnode_bitmap_update,
1310 dropped_actors,
1311 actor_splits,
1312 actor_new_dispatchers,
1313 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1314 splits: actor_cdc_table_snapshot_splits,
1315 }),
1316 sink_schema_change: Default::default(),
1317 subscriptions_to_drop: vec![],
1318 });
1319 tracing::debug!("update mutation: {mutation:?}");
1320 Some(mutation)
1321 }
1322
1323 Command::CreateSubscription {
1324 upstream_mv_table_id,
1325 subscription_id,
1326 ..
1327 } => Some(Mutation::Add(AddMutation {
1328 actor_dispatchers: Default::default(),
1329 added_actors: vec![],
1330 actor_splits: Default::default(),
1331 pause: false,
1332 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1333 upstream_mv_table_id: *upstream_mv_table_id,
1334 subscriber_id: subscription_id.as_subscriber_id(),
1335 }],
1336 backfill_nodes_to_pause: vec![],
1337 actor_cdc_table_snapshot_splits: None,
1338 new_upstream_sinks: Default::default(),
1339 })),
1340 Command::DropSubscription {
1341 upstream_mv_table_id,
1342 subscription_id,
1343 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1344 info: vec![SubscriptionUpstreamInfo {
1345 subscriber_id: subscription_id.as_subscriber_id(),
1346 upstream_mv_table_id: *upstream_mv_table_id,
1347 }],
1348 })),
1349 Command::ConnectorPropsChange(config) => {
1350 let mut connector_props_infos = HashMap::default();
1351 for (k, v) in config {
1352 connector_props_infos.insert(
1353 k.as_raw_id(),
1354 ConnectorPropsInfo {
1355 connector_props_info: v.clone(),
1356 },
1357 );
1358 }
1359 Some(Mutation::ConnectorPropsChange(
1360 ConnectorPropsChangeMutation {
1361 connector_props_infos,
1362 },
1363 ))
1364 }
1365 Command::Refresh {
1366 table_id,
1367 associated_source_id,
1368 } => Some(Mutation::RefreshStart(
1369 risingwave_pb::stream_plan::RefreshStartMutation {
1370 table_id: *table_id,
1371 associated_source_id: *associated_source_id,
1372 },
1373 )),
1374 Command::ListFinish {
1375 table_id: _,
1376 associated_source_id,
1377 } => Some(Mutation::ListFinish(ListFinishMutation {
1378 associated_source_id: *associated_source_id,
1379 })),
1380 Command::LoadFinish {
1381 table_id: _,
1382 associated_source_id,
1383 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1384 associated_source_id: *associated_source_id,
1385 })),
1386 Command::ResetSource { source_id } => Some(Mutation::ResetSource(
1387 risingwave_pb::stream_plan::ResetSourceMutation {
1388 source_id: source_id.as_raw_id(),
1389 },
1390 )),
1391 };
1392 Ok(mutation)
1393 }
1394
1395 pub(super) fn actors_to_create(
1396 &self,
1397 database_info: &InflightDatabaseInfo,
1398 edges: &mut Option<FragmentEdgeBuildResult>,
1399 control_stream_manager: &ControlStreamManager,
1400 ) -> Option<StreamJobActorsToCreate> {
1401 match self {
1402 Command::CreateStreamingJob { info, job_type, .. } => {
1403 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1404 return None;
1406 }
1407 let actors_to_create = info.stream_job_fragments.actors_to_create();
1408 let edges = edges.as_mut().expect("should exist");
1409 Some(edges.collect_actors_to_create(actors_to_create.map(
1410 |(fragment_id, node, actors)| {
1411 (
1412 fragment_id,
1413 node,
1414 actors,
1415 [], )
1417 },
1418 )))
1419 }
1420 Command::RescheduleFragment {
1421 reschedules,
1422 fragment_actors,
1423 ..
1424 } => {
1425 let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1427 reschedules.iter().map(|(fragment_id, reschedule)| {
1428 (
1429 *fragment_id,
1430 reschedule.newly_created_actors.values().map(
1431 |((actor, dispatchers), _)| {
1432 (actor.actor_id, dispatchers.as_slice())
1433 },
1434 ),
1435 )
1436 }),
1437 Some((reschedules, fragment_actors)),
1438 database_info,
1439 control_stream_manager,
1440 );
1441 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1442 for (fragment_id, (actor, dispatchers), worker_id) in
1443 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1444 reschedule
1445 .newly_created_actors
1446 .values()
1447 .map(|(actors, status)| (*fragment_id, actors, status))
1448 })
1449 {
1450 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1451 map.entry(*worker_id)
1452 .or_default()
1453 .entry(fragment_id)
1454 .or_insert_with(|| {
1455 let node = database_info.fragment(fragment_id).nodes.clone();
1456 let subscribers =
1457 database_info.fragment_subscribers(fragment_id).collect();
1458 (node, vec![], subscribers)
1459 })
1460 .1
1461 .push((actor.clone(), upstreams, dispatchers.clone()));
1462 }
1463 Some(map)
1464 }
1465 Command::ReplaceStreamJob(replace_table) => {
1466 let edges = edges.as_mut().expect("should exist");
1467 let mut actors = edges.collect_actors_to_create(
1468 replace_table.new_fragments.actors_to_create().map(
1469 |(fragment_id, node, actors)| {
1470 (
1471 fragment_id,
1472 node,
1473 actors,
1474 database_info
1475 .job_subscribers(replace_table.old_fragments.stream_job_id),
1476 )
1477 },
1478 ),
1479 );
1480 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1481 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1482 (
1483 sink.new_fragment.fragment_id,
1484 &sink.new_fragment.nodes,
1485 sink.new_fragment.actors.iter().map(|actor| {
1486 (
1487 actor,
1488 sink.actor_status[&actor.actor_id]
1489 .location
1490 .as_ref()
1491 .unwrap()
1492 .worker_node_id,
1493 )
1494 }),
1495 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1496 )
1497 }));
1498 for (worker_id, fragment_actors) in sink_actors {
1499 actors.entry(worker_id).or_default().extend(fragment_actors);
1500 }
1501 }
1502 Some(actors)
1503 }
1504 _ => None,
1505 }
1506 }
1507
1508 fn generate_update_mutation_for_replace_table(
1509 dropped_actors: impl IntoIterator<Item = ActorId>,
1510 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1511 dispatchers: FragmentActorDispatchers,
1512 init_split_assignment: &SplitAssignment,
1513 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1514 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1515 ) -> Option<Mutation> {
1516 let dropped_actors = dropped_actors.into_iter().collect();
1517
1518 let actor_new_dispatchers = dispatchers
1519 .into_values()
1520 .flatten()
1521 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1522 .collect();
1523
1524 let actor_splits = init_split_assignment
1525 .values()
1526 .flat_map(build_actor_connector_splits)
1527 .collect();
1528 Some(Mutation::Update(UpdateMutation {
1529 actor_new_dispatchers,
1530 merge_update: merge_updates.into_values().flatten().collect(),
1531 dropped_actors,
1532 actor_splits,
1533 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1534 sink_schema_change: auto_refresh_schema_sinks
1535 .as_ref()
1536 .into_iter()
1537 .flat_map(|sinks| {
1538 sinks.iter().map(|sink| {
1539 (
1540 sink.original_sink.id.as_raw_id(),
1541 PbSinkSchemaChange {
1542 original_schema: sink
1543 .original_sink
1544 .columns
1545 .iter()
1546 .map(|col| PbField {
1547 data_type: Some(
1548 col.column_desc
1549 .as_ref()
1550 .unwrap()
1551 .column_type
1552 .as_ref()
1553 .unwrap()
1554 .clone(),
1555 ),
1556 name: col.column_desc.as_ref().unwrap().name.clone(),
1557 })
1558 .collect(),
1559 op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1560 fields: sink
1561 .newly_add_fields
1562 .iter()
1563 .map(|field| field.to_prost())
1564 .collect(),
1565 })),
1566 },
1567 )
1568 })
1569 })
1570 .collect(),
1571 ..Default::default()
1572 }))
1573 }
1574}
1575
1576impl Command {
1577 #[expect(clippy::type_complexity)]
1578 pub(super) fn collect_database_partial_graph_actor_upstreams(
1579 actor_dispatchers: impl Iterator<
1580 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1581 >,
1582 reschedule_dispatcher_update: Option<(
1583 &HashMap<FragmentId, Reschedule>,
1584 &HashMap<FragmentId, HashSet<ActorId>>,
1585 )>,
1586 database_info: &InflightDatabaseInfo,
1587 control_stream_manager: &ControlStreamManager,
1588 ) -> HashMap<ActorId, ActorUpstreams> {
1589 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1590 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1591 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1592 for (upstream_actor_id, dispatchers) in upstream_actors {
1593 let upstream_actor_location =
1594 upstream_fragment.actors[&upstream_actor_id].worker_id;
1595 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1596 for downstream_actor_id in dispatchers
1597 .iter()
1598 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1599 {
1600 actor_upstreams
1601 .entry(*downstream_actor_id)
1602 .or_default()
1603 .entry(upstream_fragment_id)
1604 .or_default()
1605 .insert(
1606 upstream_actor_id,
1607 PbActorInfo {
1608 actor_id: upstream_actor_id,
1609 host: Some(upstream_actor_host.clone()),
1610 partial_graph_id: to_partial_graph_id(
1611 database_info.database_id,
1612 None,
1613 ),
1614 },
1615 );
1616 }
1617 }
1618 }
1619 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1620 for reschedule in reschedules.values() {
1621 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1622 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1623 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1624 for upstream_actor_id in fragment_actors
1625 .get(upstream_fragment_id)
1626 .expect("should exist")
1627 {
1628 let upstream_actor_location =
1629 upstream_fragment.actors[upstream_actor_id].worker_id;
1630 let upstream_actor_host =
1631 control_stream_manager.host_addr(upstream_actor_location);
1632 if let Some(upstream_reschedule) = upstream_reschedule
1633 && upstream_reschedule
1634 .removed_actors
1635 .contains(upstream_actor_id)
1636 {
1637 continue;
1638 }
1639 for (_, downstream_actor_id) in
1640 reschedule
1641 .added_actors
1642 .iter()
1643 .flat_map(|(worker_id, actors)| {
1644 actors.iter().map(|actor| (*worker_id, *actor))
1645 })
1646 {
1647 actor_upstreams
1648 .entry(downstream_actor_id)
1649 .or_default()
1650 .entry(*upstream_fragment_id)
1651 .or_default()
1652 .insert(
1653 *upstream_actor_id,
1654 PbActorInfo {
1655 actor_id: *upstream_actor_id,
1656 host: Some(upstream_actor_host.clone()),
1657 partial_graph_id: to_partial_graph_id(
1658 database_info.database_id,
1659 None,
1660 ),
1661 },
1662 );
1663 }
1664 }
1665 }
1666 }
1667 }
1668 actor_upstreams
1669 }
1670}