1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
37};
38use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
39use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
40use risingwave_pb::stream_plan::barrier_mutation::Mutation;
41use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
42use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
43use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
44use risingwave_pb::stream_plan::update_mutation::*;
45use risingwave_pb::stream_plan::{
46 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
47 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
48 PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
49 StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
50};
51use risingwave_pb::stream_service::BarrierCompleteResponse;
52use tracing::warn;
53
54use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
55use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
56use crate::barrier::cdc_progress::CdcTableBackfillTracker;
57use crate::barrier::edge_builder::FragmentEdgeBuildResult;
58use crate::barrier::info::BarrierInfo;
59use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
60use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
61use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
62use crate::controller::scale::LoadedFragmentContext;
63use crate::controller::utils::StreamingJobExtraInfo;
64use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
65use crate::manager::{StreamingJob, StreamingJobType};
66use crate::model::{
67 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
68 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
69 StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
70};
71use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
72use crate::stream::{
73 AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
74 SplitAssignment, SplitState, UpstreamSinkInfo, build_actor_connector_splits,
75};
76use crate::{MetaError, MetaResult};
77
78#[derive(Debug, Clone)]
81pub struct Reschedule {
82 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
84
85 pub removed_actors: HashSet<ActorId>,
87
88 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
90
91 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
93 pub upstream_dispatcher_mapping: Option<ActorMapping>,
98
99 pub downstream_fragment_ids: Vec<FragmentId>,
101
102 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
106
107 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
108}
109
110#[derive(Debug, Clone)]
111pub struct ReschedulePlan {
112 pub reschedules: HashMap<FragmentId, Reschedule>,
113 pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
116}
117
118#[derive(Debug, Clone)]
120pub struct RescheduleContext {
121 pub loaded: LoadedFragmentContext,
122 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
123 pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
124 pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
125 pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
126}
127
128impl RescheduleContext {
129 pub fn empty() -> Self {
130 Self {
131 loaded: LoadedFragmentContext::default(),
132 job_extra_info: HashMap::new(),
133 upstream_fragments: HashMap::new(),
134 downstream_fragments: HashMap::new(),
135 downstream_relations: HashMap::new(),
136 }
137 }
138
139 pub fn is_empty(&self) -> bool {
140 self.loaded.is_empty()
141 }
142
143 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
144 let loaded = self.loaded.for_database(database_id)?;
145 let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
146 let fragment_ids: HashSet<FragmentId> = loaded
149 .job_fragments
150 .values()
151 .flat_map(|fragments| fragments.keys().copied())
152 .collect();
153
154 let job_extra_info = self
155 .job_extra_info
156 .iter()
157 .filter(|(job_id, _)| job_ids.contains(*job_id))
158 .map(|(job_id, info)| (*job_id, info.clone()))
159 .collect();
160
161 let upstream_fragments = self
162 .upstream_fragments
163 .iter()
164 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
165 .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
166 .collect();
167
168 let downstream_fragments = self
169 .downstream_fragments
170 .iter()
171 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
172 .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
173 .collect();
174
175 let downstream_relations = self
176 .downstream_relations
177 .iter()
178 .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
182 .map(|(key, relation)| (*key, relation.clone()))
183 .collect();
184
185 Some(Self {
186 loaded,
187 job_extra_info,
188 upstream_fragments,
189 downstream_fragments,
190 downstream_relations,
191 })
192 }
193
194 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
197 let Self {
198 loaded,
199 job_extra_info,
200 upstream_fragments,
201 downstream_fragments,
202 downstream_relations,
203 } = self;
204
205 let mut contexts: HashMap<_, _> = loaded
206 .into_database_contexts()
207 .into_iter()
208 .map(|(database_id, loaded)| {
209 (
210 database_id,
211 Self {
212 loaded,
213 job_extra_info: HashMap::new(),
214 upstream_fragments: HashMap::new(),
215 downstream_fragments: HashMap::new(),
216 downstream_relations: HashMap::new(),
217 },
218 )
219 })
220 .collect();
221
222 if contexts.is_empty() {
223 return contexts;
224 }
225
226 let mut job_databases = HashMap::new();
227 let mut fragment_databases = HashMap::new();
228 for (&database_id, context) in &contexts {
229 for job_id in context.loaded.job_map.keys().copied() {
230 job_databases.insert(job_id, database_id);
231 }
232 for fragment_id in context
233 .loaded
234 .job_fragments
235 .values()
236 .flat_map(|fragments| fragments.keys().copied())
237 {
238 fragment_databases.insert(fragment_id, database_id);
239 }
240 }
241
242 for (job_id, info) in job_extra_info {
243 if let Some(database_id) = job_databases.get(&job_id).copied() {
244 contexts
245 .get_mut(&database_id)
246 .expect("database context should exist for job")
247 .job_extra_info
248 .insert(job_id, info);
249 }
250 }
251
252 for (fragment_id, upstreams) in upstream_fragments {
253 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
254 contexts
255 .get_mut(&database_id)
256 .expect("database context should exist for fragment")
257 .upstream_fragments
258 .insert(fragment_id, upstreams);
259 }
260 }
261
262 for (fragment_id, downstreams) in downstream_fragments {
263 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
264 contexts
265 .get_mut(&database_id)
266 .expect("database context should exist for fragment")
267 .downstream_fragments
268 .insert(fragment_id, downstreams);
269 }
270 }
271
272 for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
273 if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
276 contexts
277 .get_mut(&database_id)
278 .expect("database context should exist for relation source")
279 .downstream_relations
280 .insert((source_fragment_id, target_fragment_id), relation);
281 }
282 }
283
284 contexts
285 }
286}
287
288#[derive(Debug, Clone)]
295pub struct ReplaceStreamJobPlan {
296 pub old_fragments: StreamJobFragments,
297 pub new_fragments: StreamJobFragmentsToCreate,
298 pub replace_upstream: FragmentReplaceUpstream,
301 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
302 pub init_split_assignment: SplitAssignment,
308 pub streaming_job: StreamingJob,
310 pub tmp_id: JobId,
312 pub to_drop_state_table_ids: Vec<TableId>,
314 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
315}
316
317impl ReplaceStreamJobPlan {
318 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
319 let mut fragment_changes = HashMap::new();
320 for (fragment_id, new_fragment) in self
321 .new_fragments
322 .new_fragment_info(&self.init_split_assignment)
323 {
324 let fragment_change = CommandFragmentChanges::NewFragment {
325 job_id: self.streaming_job.id(),
326 info: new_fragment,
327 };
328 fragment_changes
329 .try_insert(fragment_id, fragment_change)
330 .expect("non-duplicate");
331 }
332 for fragment in self.old_fragments.fragments.values() {
333 fragment_changes
334 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
335 .expect("non-duplicate");
336 }
337 for (fragment_id, replace_map) in &self.replace_upstream {
338 fragment_changes
339 .try_insert(
340 *fragment_id,
341 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
342 )
343 .expect("non-duplicate");
344 }
345 if let Some(sinks) = &self.auto_refresh_schema_sinks {
346 for sink in sinks {
347 let fragment_change = CommandFragmentChanges::NewFragment {
348 job_id: sink.original_sink.id.as_job_id(),
349 info: sink.new_fragment_info(),
350 };
351 fragment_changes
352 .try_insert(sink.new_fragment.fragment_id, fragment_change)
353 .expect("non-duplicate");
354 fragment_changes
355 .try_insert(
356 sink.original_fragment.fragment_id,
357 CommandFragmentChanges::RemoveFragment,
358 )
359 .expect("non-duplicate");
360 }
361 }
362 fragment_changes
363 }
364
365 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
367 let mut fragment_replacements = HashMap::new();
368 for (upstream_fragment_id, new_upstream_fragment_id) in
369 self.replace_upstream.values().flatten()
370 {
371 {
372 let r =
373 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
374 if let Some(r) = r {
375 assert_eq!(
376 *new_upstream_fragment_id, r,
377 "one fragment is replaced by multiple fragments"
378 );
379 }
380 }
381 }
382 fragment_replacements
383 }
384}
385
386#[derive(educe::Educe, Clone)]
387#[educe(Debug)]
388pub struct CreateStreamingJobCommandInfo {
389 #[educe(Debug(ignore))]
390 pub stream_job_fragments: StreamJobFragmentsToCreate,
391 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
392 pub init_split_assignment: SplitAssignment,
393 pub definition: String,
394 pub job_type: StreamingJobType,
395 pub create_type: CreateType,
396 pub streaming_job: StreamingJob,
397 pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
398 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
399 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
400 pub is_serverless: bool,
401}
402
403impl StreamJobFragments {
404 pub(super) fn new_fragment_info<'a>(
405 &'a self,
406 assignment: &'a SplitAssignment,
407 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
408 self.fragments.values().map(|fragment| {
409 let mut fragment_splits = assignment
410 .get(&fragment.fragment_id)
411 .cloned()
412 .unwrap_or_default();
413
414 (
415 fragment.fragment_id,
416 InflightFragmentInfo {
417 fragment_id: fragment.fragment_id,
418 distribution_type: fragment.distribution_type.into(),
419 fragment_type_mask: fragment.fragment_type_mask,
420 vnode_count: fragment.vnode_count(),
421 nodes: fragment.nodes.clone(),
422 actors: fragment
423 .actors
424 .iter()
425 .map(|actor| {
426 (
427 actor.actor_id,
428 InflightActorInfo {
429 worker_id: self
430 .actor_status
431 .get(&actor.actor_id)
432 .expect("should exist")
433 .worker_id(),
434 vnode_bitmap: actor.vnode_bitmap.clone(),
435 splits: fragment_splits
436 .remove(&actor.actor_id)
437 .unwrap_or_default(),
438 },
439 )
440 })
441 .collect(),
442 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
443 },
444 )
445 })
446 }
447}
448
449#[derive(Debug, Clone)]
450pub struct SnapshotBackfillInfo {
451 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
455}
456
457#[derive(Debug, Clone)]
458pub enum CreateStreamingJobType {
459 Normal,
460 SinkIntoTable(UpstreamSinkInfo),
461 SnapshotBackfill(SnapshotBackfillInfo),
462}
463
464#[derive(Debug)]
469pub enum Command {
470 Flush,
473
474 Pause,
477
478 Resume,
482
483 DropStreamingJobs {
491 streaming_job_ids: HashSet<JobId>,
492 actors: Vec<ActorId>,
493 unregistered_state_table_ids: HashSet<TableId>,
494 unregistered_fragment_ids: HashSet<FragmentId>,
495 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
497 },
498
499 CreateStreamingJob {
509 info: CreateStreamingJobCommandInfo,
510 job_type: CreateStreamingJobType,
511 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
512 },
513
514 RescheduleIntent {
516 context: RescheduleContext,
517 reschedule_plan: Option<ReschedulePlan>,
522 },
523
524 ReplaceStreamJob(ReplaceStreamJobPlan),
531
532 SourceChangeSplit(SplitState),
535
536 Throttle {
539 jobs: HashSet<JobId>,
540 config: HashMap<FragmentId, ThrottleConfig>,
541 },
542
543 CreateSubscription {
546 subscription_id: SubscriptionId,
547 upstream_mv_table_id: TableId,
548 retention_second: u64,
549 },
550
551 DropSubscription {
555 subscription_id: SubscriptionId,
556 upstream_mv_table_id: TableId,
557 },
558
559 ConnectorPropsChange(ConnectorPropsChange),
560
561 Refresh {
564 table_id: TableId,
565 associated_source_id: SourceId,
566 },
567 ListFinish {
568 table_id: TableId,
569 associated_source_id: SourceId,
570 },
571 LoadFinish {
572 table_id: TableId,
573 associated_source_id: SourceId,
574 },
575
576 ResetSource {
579 source_id: SourceId,
580 },
581
582 ResumeBackfill {
585 target: ResumeBackfillTarget,
586 },
587
588 InjectSourceOffsets {
592 source_id: SourceId,
593 split_offsets: HashMap<String, String>,
595 },
596}
597
598#[derive(Debug, Clone, Copy)]
599pub enum ResumeBackfillTarget {
600 Job(JobId),
601 Fragment(FragmentId),
602}
603
604impl std::fmt::Display for Command {
606 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
607 match self {
608 Command::Flush => write!(f, "Flush"),
609 Command::Pause => write!(f, "Pause"),
610 Command::Resume => write!(f, "Resume"),
611 Command::DropStreamingJobs {
612 streaming_job_ids, ..
613 } => {
614 write!(
615 f,
616 "DropStreamingJobs: {}",
617 streaming_job_ids.iter().sorted().join(", ")
618 )
619 }
620 Command::CreateStreamingJob { info, .. } => {
621 write!(f, "CreateStreamingJob: {}", info.streaming_job)
622 }
623 Command::RescheduleIntent {
624 reschedule_plan, ..
625 } => {
626 if reschedule_plan.is_some() {
627 write!(f, "RescheduleIntent(planned)")
628 } else {
629 write!(f, "RescheduleIntent")
630 }
631 }
632 Command::ReplaceStreamJob(plan) => {
633 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
634 }
635 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
636 Command::Throttle { .. } => write!(f, "Throttle"),
637 Command::CreateSubscription {
638 subscription_id, ..
639 } => write!(f, "CreateSubscription: {subscription_id}"),
640 Command::DropSubscription {
641 subscription_id, ..
642 } => write!(f, "DropSubscription: {subscription_id}"),
643 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
644 Command::Refresh {
645 table_id,
646 associated_source_id,
647 } => write!(
648 f,
649 "Refresh: {} (source: {})",
650 table_id, associated_source_id
651 ),
652 Command::ListFinish {
653 table_id,
654 associated_source_id,
655 } => write!(
656 f,
657 "ListFinish: {} (source: {})",
658 table_id, associated_source_id
659 ),
660 Command::LoadFinish {
661 table_id,
662 associated_source_id,
663 } => write!(
664 f,
665 "LoadFinish: {} (source: {})",
666 table_id, associated_source_id
667 ),
668 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
669 Command::ResumeBackfill { target } => match target {
670 ResumeBackfillTarget::Job(job_id) => {
671 write!(f, "ResumeBackfill: job={job_id}")
672 }
673 ResumeBackfillTarget::Fragment(fragment_id) => {
674 write!(f, "ResumeBackfill: fragment={fragment_id}")
675 }
676 },
677 Command::InjectSourceOffsets {
678 source_id,
679 split_offsets,
680 } => write!(
681 f,
682 "InjectSourceOffsets: {} ({} splits)",
683 source_id,
684 split_offsets.len()
685 ),
686 }
687 }
688}
689
690impl Command {
691 pub fn pause() -> Self {
692 Self::Pause
693 }
694
695 pub fn resume() -> Self {
696 Self::Resume
697 }
698
699 #[expect(clippy::type_complexity)]
700 pub(super) fn fragment_changes(
701 &self,
702 ) -> Option<(
703 Option<(JobId, Option<CdcTableBackfillTracker>)>,
704 HashMap<FragmentId, CommandFragmentChanges>,
705 )> {
706 match self {
707 Command::Flush => None,
708 Command::Pause => None,
709 Command::Resume => None,
710 Command::DropStreamingJobs {
711 unregistered_fragment_ids,
712 dropped_sink_fragment_by_targets,
713 ..
714 } => {
715 let changes = unregistered_fragment_ids
716 .iter()
717 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
718 .chain(dropped_sink_fragment_by_targets.iter().map(
719 |(target_fragment, sink_fragments)| {
720 (
721 *target_fragment,
722 CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
723 )
724 },
725 ))
726 .collect();
727
728 Some((None, changes))
729 }
730 Command::CreateStreamingJob { info, job_type, .. } => {
731 assert!(
732 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
733 "should handle fragment changes separately for snapshot backfill"
734 );
735 let mut changes: HashMap<_, _> = info
736 .stream_job_fragments
737 .new_fragment_info(&info.init_split_assignment)
738 .map(|(fragment_id, fragment_infos)| {
739 (
740 fragment_id,
741 CommandFragmentChanges::NewFragment {
742 job_id: info.streaming_job.id(),
743 info: fragment_infos,
744 },
745 )
746 })
747 .collect();
748
749 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
750 let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
751 changes.insert(
752 downstream_fragment_id,
753 CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
754 upstream_fragment_id: ctx.sink_fragment_id,
755 sink_output_schema: ctx.sink_output_fields.clone(),
756 project_exprs: ctx.project_exprs.clone(),
757 }),
758 );
759 }
760
761 let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
762 let (fragment, _) =
763 parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
764 .expect("should have parallel cdc fragment");
765 Some(CdcTableBackfillTracker::new(
766 fragment.fragment_id,
767 splits.clone(),
768 ))
769 } else {
770 None
771 };
772
773 Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
774 }
775 Command::RescheduleIntent {
776 reschedule_plan, ..
777 } => {
778 let ReschedulePlan { reschedules, .. } = reschedule_plan
779 .as_ref()
780 .expect("reschedule intent should be resolved in global barrier worker");
781 Some((
782 None,
783 reschedules
784 .iter()
785 .map(|(fragment_id, reschedule)| {
786 (
787 *fragment_id,
788 CommandFragmentChanges::Reschedule {
789 new_actors: reschedule
790 .added_actors
791 .iter()
792 .flat_map(|(node_id, actors)| {
793 actors.iter().map(|actor_id| {
794 (
795 *actor_id,
796 InflightActorInfo {
797 worker_id: *node_id,
798 vnode_bitmap: reschedule
799 .newly_created_actors
800 .get(actor_id)
801 .expect("should exist")
802 .0
803 .0
804 .vnode_bitmap
805 .clone(),
806 splits: reschedule
807 .actor_splits
808 .get(actor_id)
809 .cloned()
810 .unwrap_or_default(),
811 },
812 )
813 })
814 })
815 .collect(),
816 actor_update_vnode_bitmap: reschedule
817 .vnode_bitmap_updates
818 .iter()
819 .filter(|(actor_id, _)| {
820 !reschedule.newly_created_actors.contains_key(*actor_id)
822 })
823 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
824 .collect(),
825 to_remove: reschedule.removed_actors.iter().cloned().collect(),
826 actor_splits: reschedule.actor_splits.clone(),
827 },
828 )
829 })
830 .collect(),
831 ))
832 }
833 Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
834 Command::SourceChangeSplit(SplitState {
835 split_assignment, ..
836 }) => Some((
837 None,
838 split_assignment
839 .iter()
840 .map(|(&fragment_id, splits)| {
841 (
842 fragment_id,
843 CommandFragmentChanges::SplitAssignment {
844 actor_splits: splits.clone(),
845 },
846 )
847 })
848 .collect(),
849 )),
850 Command::Throttle { .. } => None,
851 Command::CreateSubscription { .. } => None,
852 Command::DropSubscription { .. } => None,
853 Command::ConnectorPropsChange(_) => None,
854 Command::Refresh { .. } => None, Command::ListFinish { .. } => None, Command::LoadFinish { .. } => None, Command::ResetSource { .. } => None, Command::ResumeBackfill { .. } => None, Command::InjectSourceOffsets { .. } => None, }
861 }
862
863 pub fn need_checkpoint(&self) -> bool {
864 !matches!(self, Command::Resume)
866 }
867}
868
869#[derive(Debug)]
870pub enum PostCollectCommand {
871 Command(String),
872 DropStreamingJobs {
873 streaming_job_ids: HashSet<JobId>,
874 unregistered_state_table_ids: HashSet<TableId>,
875 },
876 CreateStreamingJob {
877 info: CreateStreamingJobCommandInfo,
878 job_type: CreateStreamingJobType,
879 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
880 },
881 Reschedule {
882 reschedules: HashMap<FragmentId, Reschedule>,
883 },
884 ReplaceStreamJob(ReplaceStreamJobPlan),
885 SourceChangeSplit {
886 split_assignment: SplitAssignment,
887 },
888 CreateSubscription {
889 subscription_id: SubscriptionId,
890 },
891 ConnectorPropsChange(ConnectorPropsChange),
892 ResumeBackfill {
893 target: ResumeBackfillTarget,
894 },
895}
896
897impl PostCollectCommand {
898 pub fn barrier() -> Self {
899 PostCollectCommand::Command("barrier".to_owned())
900 }
901
902 pub fn command_name(&self) -> &str {
903 match self {
904 PostCollectCommand::Command(name) => name.as_str(),
905 PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
906 PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
907 PostCollectCommand::Reschedule { .. } => "Reschedule",
908 PostCollectCommand::ReplaceStreamJob(_) => "ReplaceStreamJob",
909 PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
910 PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
911 PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
912 PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
913 }
914 }
915}
916
917impl Display for PostCollectCommand {
918 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
919 f.write_str(self.command_name())
920 }
921}
922
923impl Command {
924 pub(super) fn into_post_collect(self) -> PostCollectCommand {
925 match self {
926 Command::DropStreamingJobs {
927 streaming_job_ids,
928 unregistered_state_table_ids,
929 ..
930 } => PostCollectCommand::DropStreamingJobs {
931 streaming_job_ids,
932 unregistered_state_table_ids,
933 },
934 Command::CreateStreamingJob {
935 info,
936 job_type,
937 cross_db_snapshot_backfill_info,
938 } => match job_type {
939 CreateStreamingJobType::SnapshotBackfill(_) => PostCollectCommand::barrier(),
940 job_type => PostCollectCommand::CreateStreamingJob {
941 info,
942 job_type,
943 cross_db_snapshot_backfill_info,
944 },
945 },
946 Command::RescheduleIntent {
947 reschedule_plan, ..
948 } => {
949 let ReschedulePlan { reschedules, .. } = reschedule_plan
950 .expect("reschedule intent should be resolved in global barrier worker");
951 PostCollectCommand::Reschedule { reschedules }
952 }
953 Command::ReplaceStreamJob(plan) => PostCollectCommand::ReplaceStreamJob(plan),
954 Command::SourceChangeSplit(SplitState { split_assignment }) => {
955 PostCollectCommand::SourceChangeSplit { split_assignment }
956 }
957 Command::CreateSubscription {
958 subscription_id, ..
959 } => PostCollectCommand::CreateSubscription { subscription_id },
960 Command::ConnectorPropsChange(connector_props_change) => {
961 PostCollectCommand::ConnectorPropsChange(connector_props_change)
962 }
963 Command::Flush => PostCollectCommand::Command("Flush".to_owned()),
964 Command::Pause => PostCollectCommand::Command("Pause".to_owned()),
965 Command::Resume => PostCollectCommand::Command("Resume".to_owned()),
966 Command::Throttle { .. } => PostCollectCommand::Command("Throttle".to_owned()),
967 Command::DropSubscription { .. } => {
968 PostCollectCommand::Command("DropSubscription".to_owned())
969 }
970 Command::Refresh { .. } => PostCollectCommand::Command("Refresh".to_owned()),
971 Command::ListFinish { .. } => PostCollectCommand::Command("ListFinish".to_owned()),
972 Command::LoadFinish { .. } => PostCollectCommand::Command("LoadFinish".to_owned()),
973 Command::ResetSource { .. } => PostCollectCommand::Command("ResetSource".to_owned()),
974 Command::ResumeBackfill { target } => PostCollectCommand::ResumeBackfill { target },
975 Command::InjectSourceOffsets { .. } => {
976 PostCollectCommand::Command("InjectSourceOffsets".to_owned())
977 }
978 }
979 }
980}
981
982#[derive(Debug, Clone)]
983pub enum BarrierKind {
984 Initial,
985 Barrier,
986 Checkpoint(Vec<u64>),
988}
989
990impl BarrierKind {
991 pub fn to_protobuf(&self) -> PbBarrierKind {
992 match self {
993 BarrierKind::Initial => PbBarrierKind::Initial,
994 BarrierKind::Barrier => PbBarrierKind::Barrier,
995 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
996 }
997 }
998
999 pub fn is_checkpoint(&self) -> bool {
1000 matches!(self, BarrierKind::Checkpoint(_))
1001 }
1002
1003 pub fn is_initial(&self) -> bool {
1004 matches!(self, BarrierKind::Initial)
1005 }
1006
1007 pub fn as_str_name(&self) -> &'static str {
1008 match self {
1009 BarrierKind::Initial => "Initial",
1010 BarrierKind::Barrier => "Barrier",
1011 BarrierKind::Checkpoint(_) => "Checkpoint",
1012 }
1013 }
1014}
1015
1016pub(super) struct CommandContext {
1019 mv_subscription_max_retention: HashMap<TableId, u64>,
1020
1021 pub(super) barrier_info: BarrierInfo,
1022
1023 pub(super) table_ids_to_commit: HashSet<TableId>,
1024
1025 pub(super) command: PostCollectCommand,
1026
1027 _span: tracing::Span,
1033}
1034
1035impl std::fmt::Debug for CommandContext {
1036 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1037 f.debug_struct("CommandContext")
1038 .field("barrier_info", &self.barrier_info)
1039 .field("command", &self.command.command_name())
1040 .finish()
1041 }
1042}
1043
1044impl CommandContext {
1045 pub(super) fn new(
1046 barrier_info: BarrierInfo,
1047 mv_subscription_max_retention: HashMap<TableId, u64>,
1048 table_ids_to_commit: HashSet<TableId>,
1049 command: PostCollectCommand,
1050 span: tracing::Span,
1051 ) -> Self {
1052 Self {
1053 mv_subscription_max_retention,
1054 barrier_info,
1055 table_ids_to_commit,
1056 command,
1057 _span: span,
1058 }
1059 }
1060
1061 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
1062 let Some(truncate_timestamptz) = Timestamptz::from_secs(
1063 self.barrier_info
1064 .prev_epoch
1065 .value()
1066 .as_timestamptz()
1067 .timestamp()
1068 - retention_second as i64,
1069 ) else {
1070 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
1071 return self.barrier_info.prev_epoch.value();
1072 };
1073 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
1074 }
1075
1076 pub(super) fn collect_commit_epoch_info(
1077 &self,
1078 info: &mut CommitEpochInfo,
1079 resps: Vec<BarrierCompleteResponse>,
1080 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
1081 ) {
1082 let (
1083 sst_to_context,
1084 synced_ssts,
1085 new_table_watermarks,
1086 old_value_ssts,
1087 vector_index_adds,
1088 truncate_tables,
1089 ) = collect_resp_info(resps);
1090
1091 let new_table_fragment_infos =
1092 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
1093 assert!(!matches!(
1094 job_type,
1095 CreateStreamingJobType::SnapshotBackfill(_)
1096 ));
1097 let table_fragments = &info.stream_job_fragments;
1098 let mut table_ids: HashSet<_> =
1099 table_fragments.internal_table_ids().into_iter().collect();
1100 if let Some(mv_table_id) = table_fragments.mv_table_id() {
1101 table_ids.insert(mv_table_id);
1102 }
1103
1104 vec![NewTableFragmentInfo { table_ids }]
1105 } else {
1106 vec![]
1107 };
1108
1109 let mut mv_log_store_truncate_epoch = HashMap::new();
1110 let mut update_truncate_epoch =
1112 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
1113 Entry::Occupied(mut entry) => {
1114 let prev_truncate_epoch = entry.get_mut();
1115 if truncate_epoch < *prev_truncate_epoch {
1116 *prev_truncate_epoch = truncate_epoch;
1117 }
1118 }
1119 Entry::Vacant(entry) => {
1120 entry.insert(truncate_epoch);
1121 }
1122 };
1123 for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
1124 let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
1125 update_truncate_epoch(*mv_table_id, truncate_epoch);
1126 }
1127 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
1128 for mv_table_id in upstream_mv_table_ids {
1129 update_truncate_epoch(mv_table_id, backfill_epoch);
1130 }
1131 }
1132
1133 let table_new_change_log = build_table_change_log_delta(
1134 old_value_ssts.into_iter(),
1135 synced_ssts.iter().map(|sst| &sst.sst_info),
1136 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
1137 mv_log_store_truncate_epoch.into_iter(),
1138 );
1139
1140 let epoch = self.barrier_info.prev_epoch();
1141 for table_id in &self.table_ids_to_commit {
1142 info.tables_to_commit
1143 .try_insert(*table_id, epoch)
1144 .expect("non duplicate");
1145 }
1146
1147 info.sstables.extend(synced_ssts);
1148 info.new_table_watermarks.extend(new_table_watermarks);
1149 info.sst_to_context.extend(sst_to_context);
1150 info.new_table_fragment_infos
1151 .extend(new_table_fragment_infos);
1152 info.change_log_delta.extend(table_new_change_log);
1153 for (table_id, vector_index_adds) in vector_index_adds {
1154 info.vector_index_delta
1155 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
1156 .expect("non-duplicate");
1157 }
1158 if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
1159 && let Some(index_table) = collect_new_vector_index_info(job_info)
1160 {
1161 info.vector_index_delta
1162 .try_insert(
1163 index_table.id,
1164 VectorIndexDelta::Init(PbVectorIndexInit {
1165 info: Some(index_table.vector_index_info.unwrap()),
1166 }),
1167 )
1168 .expect("non-duplicate");
1169 }
1170 info.truncate_tables.extend(truncate_tables);
1171 }
1172}
1173
1174impl Command {
1175 pub(super) fn to_mutation(
1179 &self,
1180 is_currently_paused: bool,
1181 edges: &mut Option<FragmentEdgeBuildResult>,
1182 control_stream_manager: &ControlStreamManager,
1183 database_info: &mut InflightDatabaseInfo,
1184 ) -> MetaResult<Option<Mutation>> {
1185 let database_id = database_info.database_id;
1186 let mutation = match self {
1187 Command::Flush => None,
1188
1189 Command::Pause => {
1190 if !is_currently_paused {
1193 Some(Mutation::Pause(PauseMutation {}))
1194 } else {
1195 None
1196 }
1197 }
1198
1199 Command::Resume => {
1200 if is_currently_paused {
1202 Some(Mutation::Resume(ResumeMutation {}))
1203 } else {
1204 None
1205 }
1206 }
1207
1208 Command::SourceChangeSplit(SplitState {
1209 split_assignment, ..
1210 }) => {
1211 let mut diff = HashMap::new();
1212
1213 for actor_splits in split_assignment.values() {
1214 diff.extend(actor_splits.clone());
1215 }
1216
1217 Some(Mutation::Splits(SourceChangeSplitMutation {
1218 actor_splits: build_actor_connector_splits(&diff),
1219 }))
1220 }
1221
1222 Command::Throttle { config, .. } => {
1223 let config = config.clone();
1224 Some(Mutation::Throttle(ThrottleMutation {
1225 fragment_throttle: config,
1226 }))
1227 }
1228
1229 Command::DropStreamingJobs {
1230 actors,
1231 dropped_sink_fragment_by_targets,
1232 ..
1233 } => Some(Mutation::Stop(StopMutation {
1234 actors: actors.clone(),
1235 dropped_sink_fragments: dropped_sink_fragment_by_targets
1236 .values()
1237 .flatten()
1238 .cloned()
1239 .collect(),
1240 })),
1241
1242 Command::CreateStreamingJob {
1243 info:
1244 CreateStreamingJobCommandInfo {
1245 stream_job_fragments,
1246 init_split_assignment: split_assignment,
1247 upstream_fragment_downstreams,
1248 fragment_backfill_ordering,
1249 ..
1250 },
1251 job_type,
1252 ..
1253 } => {
1254 let edges = edges.as_mut().expect("should exist");
1255 let added_actors = stream_job_fragments.actor_ids().collect();
1256 let actor_splits = split_assignment
1257 .values()
1258 .flat_map(build_actor_connector_splits)
1259 .collect();
1260 let subscriptions_to_add =
1261 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1262 job_type
1263 {
1264 snapshot_backfill_info
1265 .upstream_mv_table_id_to_backfill_epoch
1266 .keys()
1267 .map(|table_id| SubscriptionUpstreamInfo {
1268 subscriber_id: stream_job_fragments
1269 .stream_job_id()
1270 .as_subscriber_id(),
1271 upstream_mv_table_id: *table_id,
1272 })
1273 .collect()
1274 } else {
1275 Default::default()
1276 };
1277 let backfill_nodes_to_pause: Vec<_> =
1278 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1279 .into_iter()
1280 .collect();
1281
1282 let new_upstream_sinks =
1283 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1284 sink_fragment_id,
1285 sink_output_fields,
1286 project_exprs,
1287 new_sink_downstream,
1288 ..
1289 }) = job_type
1290 {
1291 let new_sink_actors = stream_job_fragments
1292 .actors_to_create()
1293 .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
1294 .exactly_one()
1295 .map(|(_, _, actors)| {
1296 actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
1297 actor_id: actor.actor_id,
1298 host: Some(control_stream_manager.host_addr(worker_id)),
1299 partial_graph_id: to_partial_graph_id(database_id, None),
1300 })
1301 })
1302 .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
1303 let new_upstream_sink = PbNewUpstreamSink {
1304 info: Some(PbUpstreamSinkInfo {
1305 upstream_fragment_id: *sink_fragment_id,
1306 sink_output_schema: sink_output_fields.clone(),
1307 project_exprs: project_exprs.clone(),
1308 }),
1309 upstream_actors: new_sink_actors.collect(),
1310 };
1311 HashMap::from([(
1312 new_sink_downstream.downstream_fragment_id,
1313 new_upstream_sink,
1314 )])
1315 } else {
1316 HashMap::new()
1317 };
1318
1319 let actor_cdc_table_snapshot_splits =
1320 if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
1321 database_info
1322 .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
1323 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
1324 } else {
1325 None
1326 };
1327
1328 let add_mutation = AddMutation {
1329 actor_dispatchers: edges
1330 .dispatchers
1331 .extract_if(|fragment_id, _| {
1332 upstream_fragment_downstreams.contains_key(fragment_id)
1333 })
1334 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1335 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1336 .collect(),
1337 added_actors,
1338 actor_splits,
1339 pause: is_currently_paused,
1341 subscriptions_to_add,
1342 backfill_nodes_to_pause,
1343 actor_cdc_table_snapshot_splits,
1344 new_upstream_sinks,
1345 };
1346
1347 Some(Mutation::Add(add_mutation))
1348 }
1349
1350 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1351 old_fragments,
1352 replace_upstream,
1353 upstream_fragment_downstreams,
1354 init_split_assignment,
1355 auto_refresh_schema_sinks,
1356 ..
1357 }) => {
1358 let edges = edges.as_mut().expect("should exist");
1359 let merge_updates = edges
1360 .merge_updates
1361 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1362 .collect();
1363 let dispatchers = edges
1364 .dispatchers
1365 .extract_if(|fragment_id, _| {
1366 upstream_fragment_downstreams.contains_key(fragment_id)
1367 })
1368 .collect();
1369 let actor_cdc_table_snapshot_splits = database_info
1370 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1371 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1372 Self::generate_update_mutation_for_replace_table(
1373 old_fragments.actor_ids().chain(
1374 auto_refresh_schema_sinks
1375 .as_ref()
1376 .into_iter()
1377 .flat_map(|sinks| {
1378 sinks.iter().flat_map(|sink| {
1379 sink.original_fragment
1380 .actors
1381 .iter()
1382 .map(|actor| actor.actor_id)
1383 })
1384 }),
1385 ),
1386 merge_updates,
1387 dispatchers,
1388 init_split_assignment,
1389 actor_cdc_table_snapshot_splits,
1390 auto_refresh_schema_sinks.as_ref(),
1391 )
1392 }
1393
1394 Command::RescheduleIntent {
1395 reschedule_plan, ..
1396 } => {
1397 let ReschedulePlan {
1398 reschedules,
1399 fragment_actors,
1400 } = reschedule_plan
1401 .as_ref()
1402 .expect("reschedule intent should be resolved in global barrier worker");
1403 let mut dispatcher_update = HashMap::new();
1404 for reschedule in reschedules.values() {
1405 for &(upstream_fragment_id, dispatcher_id) in
1406 &reschedule.upstream_fragment_dispatcher_ids
1407 {
1408 let upstream_actor_ids = fragment_actors
1410 .get(&upstream_fragment_id)
1411 .expect("should contain");
1412
1413 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1414
1415 for &actor_id in upstream_actor_ids {
1417 let added_downstream_actor_id = if upstream_reschedule
1418 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1419 .unwrap_or(true)
1420 {
1421 reschedule
1422 .added_actors
1423 .values()
1424 .flatten()
1425 .cloned()
1426 .collect()
1427 } else {
1428 Default::default()
1429 };
1430 dispatcher_update
1432 .try_insert(
1433 (actor_id, dispatcher_id),
1434 DispatcherUpdate {
1435 actor_id,
1436 dispatcher_id,
1437 hash_mapping: reschedule
1438 .upstream_dispatcher_mapping
1439 .as_ref()
1440 .map(|m| m.to_protobuf()),
1441 added_downstream_actor_id,
1442 removed_downstream_actor_id: reschedule
1443 .removed_actors
1444 .iter()
1445 .cloned()
1446 .collect(),
1447 },
1448 )
1449 .unwrap();
1450 }
1451 }
1452 }
1453 let dispatcher_update = dispatcher_update.into_values().collect();
1454
1455 let mut merge_update = HashMap::new();
1456 for (&fragment_id, reschedule) in reschedules {
1457 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1458 let downstream_actor_ids = fragment_actors
1460 .get(&downstream_fragment_id)
1461 .expect("should contain");
1462
1463 let downstream_removed_actors: HashSet<_> = reschedules
1467 .get(&downstream_fragment_id)
1468 .map(|downstream_reschedule| {
1469 downstream_reschedule
1470 .removed_actors
1471 .iter()
1472 .copied()
1473 .collect()
1474 })
1475 .unwrap_or_default();
1476
1477 for &actor_id in downstream_actor_ids {
1479 if downstream_removed_actors.contains(&actor_id) {
1480 continue;
1481 }
1482
1483 merge_update
1485 .try_insert(
1486 (actor_id, fragment_id),
1487 MergeUpdate {
1488 actor_id,
1489 upstream_fragment_id: fragment_id,
1490 new_upstream_fragment_id: None,
1491 added_upstream_actors: reschedule
1492 .added_actors
1493 .iter()
1494 .flat_map(|(worker_id, actors)| {
1495 let host =
1496 control_stream_manager.host_addr(*worker_id);
1497 actors.iter().map(move |&actor_id| PbActorInfo {
1498 actor_id,
1499 host: Some(host.clone()),
1500 partial_graph_id: to_partial_graph_id(
1502 database_id,
1503 None,
1504 ),
1505 })
1506 })
1507 .collect(),
1508 removed_upstream_actor_id: reschedule
1509 .removed_actors
1510 .iter()
1511 .cloned()
1512 .collect(),
1513 },
1514 )
1515 .unwrap();
1516 }
1517 }
1518 }
1519 let merge_update = merge_update.into_values().collect();
1520
1521 let mut actor_vnode_bitmap_update = HashMap::new();
1522 for reschedule in reschedules.values() {
1523 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1525 let bitmap = bitmap.to_protobuf();
1526 actor_vnode_bitmap_update
1527 .try_insert(actor_id, bitmap)
1528 .unwrap();
1529 }
1530 }
1531 let dropped_actors = reschedules
1532 .values()
1533 .flat_map(|r| r.removed_actors.iter().copied())
1534 .collect();
1535 let mut actor_splits = HashMap::new();
1536 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1537 for (fragment_id, reschedule) in reschedules {
1538 for (actor_id, splits) in &reschedule.actor_splits {
1539 actor_splits.insert(
1540 *actor_id,
1541 ConnectorSplits {
1542 splits: splits.iter().map(ConnectorSplit::from).collect(),
1543 },
1544 );
1545 }
1546
1547 if let Some(assignment) =
1548 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1549 {
1550 actor_cdc_table_snapshot_splits.extend(assignment)
1551 }
1552 }
1553
1554 let actor_new_dispatchers = HashMap::new();
1556 let mutation = Mutation::Update(UpdateMutation {
1557 dispatcher_update,
1558 merge_update,
1559 actor_vnode_bitmap_update,
1560 dropped_actors,
1561 actor_splits,
1562 actor_new_dispatchers,
1563 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1564 splits: actor_cdc_table_snapshot_splits,
1565 }),
1566 sink_schema_change: Default::default(),
1567 subscriptions_to_drop: vec![],
1568 });
1569 tracing::debug!("update mutation: {mutation:?}");
1570 Some(mutation)
1571 }
1572
1573 Command::CreateSubscription {
1574 upstream_mv_table_id,
1575 subscription_id,
1576 ..
1577 } => Some(Mutation::Add(AddMutation {
1578 actor_dispatchers: Default::default(),
1579 added_actors: vec![],
1580 actor_splits: Default::default(),
1581 pause: false,
1582 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1583 upstream_mv_table_id: *upstream_mv_table_id,
1584 subscriber_id: subscription_id.as_subscriber_id(),
1585 }],
1586 backfill_nodes_to_pause: vec![],
1587 actor_cdc_table_snapshot_splits: None,
1588 new_upstream_sinks: Default::default(),
1589 })),
1590 Command::DropSubscription {
1591 upstream_mv_table_id,
1592 subscription_id,
1593 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1594 info: vec![SubscriptionUpstreamInfo {
1595 subscriber_id: subscription_id.as_subscriber_id(),
1596 upstream_mv_table_id: *upstream_mv_table_id,
1597 }],
1598 })),
1599 Command::ConnectorPropsChange(config) => {
1600 let mut connector_props_infos = HashMap::default();
1601 for (k, v) in config {
1602 connector_props_infos.insert(
1603 k.as_raw_id(),
1604 ConnectorPropsInfo {
1605 connector_props_info: v.clone(),
1606 },
1607 );
1608 }
1609 Some(Mutation::ConnectorPropsChange(
1610 ConnectorPropsChangeMutation {
1611 connector_props_infos,
1612 },
1613 ))
1614 }
1615 Command::Refresh {
1616 table_id,
1617 associated_source_id,
1618 } => Some(Mutation::RefreshStart(
1619 risingwave_pb::stream_plan::RefreshStartMutation {
1620 table_id: *table_id,
1621 associated_source_id: *associated_source_id,
1622 },
1623 )),
1624 Command::ListFinish {
1625 table_id: _,
1626 associated_source_id,
1627 } => Some(Mutation::ListFinish(ListFinishMutation {
1628 associated_source_id: *associated_source_id,
1629 })),
1630 Command::LoadFinish {
1631 table_id: _,
1632 associated_source_id,
1633 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1634 associated_source_id: *associated_source_id,
1635 })),
1636 Command::ResetSource { source_id } => Some(Mutation::ResetSource(
1637 risingwave_pb::stream_plan::ResetSourceMutation {
1638 source_id: source_id.as_raw_id(),
1639 },
1640 )),
1641 Command::ResumeBackfill { target } => {
1642 let fragment_ids: HashSet<_> = match target {
1643 ResumeBackfillTarget::Job(job_id) => {
1644 database_info.backfill_fragment_ids_for_job(*job_id)?
1645 }
1646 ResumeBackfillTarget::Fragment(fragment_id) => {
1647 if !database_info.is_backfill_fragment(*fragment_id)? {
1648 return Err(MetaError::invalid_parameter(format!(
1649 "fragment {} is not a backfill node",
1650 fragment_id
1651 )));
1652 }
1653 HashSet::from([*fragment_id])
1654 }
1655 };
1656 if fragment_ids.is_empty() {
1657 warn!(
1658 ?target,
1659 "resume backfill command ignored because no backfill fragments found"
1660 );
1661 None
1662 } else {
1663 Some(Mutation::StartFragmentBackfill(
1664 StartFragmentBackfillMutation {
1665 fragment_ids: fragment_ids.into_iter().collect(),
1666 },
1667 ))
1668 }
1669 }
1670 Command::InjectSourceOffsets {
1671 source_id,
1672 split_offsets,
1673 } => Some(Mutation::InjectSourceOffsets(
1674 risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1675 source_id: source_id.as_raw_id(),
1676 split_offsets: split_offsets.clone(),
1677 },
1678 )),
1679 };
1680 Ok(mutation)
1681 }
1682
1683 pub(super) fn actors_to_create(
1684 &self,
1685 database_info: &InflightDatabaseInfo,
1686 edges: &mut Option<FragmentEdgeBuildResult>,
1687 control_stream_manager: &ControlStreamManager,
1688 ) -> Option<StreamJobActorsToCreate> {
1689 match self {
1690 Command::CreateStreamingJob { info, job_type, .. } => {
1691 if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1692 return None;
1694 }
1695 let actors_to_create = info.stream_job_fragments.actors_to_create();
1696 let edges = edges.as_mut().expect("should exist");
1697 Some(edges.collect_actors_to_create(actors_to_create.map(
1698 |(fragment_id, node, actors)| {
1699 (
1700 fragment_id,
1701 node,
1702 actors,
1703 [], )
1705 },
1706 )))
1707 }
1708 Command::RescheduleIntent {
1709 reschedule_plan, ..
1710 } => {
1711 let ReschedulePlan {
1712 reschedules,
1713 fragment_actors,
1714 } = reschedule_plan
1715 .as_ref()
1716 .expect("reschedule intent should be resolved in global barrier worker");
1717 let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1719 reschedules.iter().map(|(fragment_id, reschedule)| {
1720 (
1721 *fragment_id,
1722 reschedule.newly_created_actors.values().map(
1723 |((actor, dispatchers), _)| {
1724 (actor.actor_id, dispatchers.as_slice())
1725 },
1726 ),
1727 )
1728 }),
1729 Some((reschedules, fragment_actors)),
1730 database_info,
1731 control_stream_manager,
1732 );
1733 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1734 for (fragment_id, (actor, dispatchers), worker_id) in
1735 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1736 reschedule
1737 .newly_created_actors
1738 .values()
1739 .map(|(actors, status)| (*fragment_id, actors, status))
1740 })
1741 {
1742 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1743 map.entry(*worker_id)
1744 .or_default()
1745 .entry(fragment_id)
1746 .or_insert_with(|| {
1747 let node = database_info.fragment(fragment_id).nodes.clone();
1748 let subscribers =
1749 database_info.fragment_subscribers(fragment_id).collect();
1750 (node, vec![], subscribers)
1751 })
1752 .1
1753 .push((actor.clone(), upstreams, dispatchers.clone()));
1754 }
1755 Some(map)
1756 }
1757 Command::ReplaceStreamJob(replace_table) => {
1758 let edges = edges.as_mut().expect("should exist");
1759 let mut actors = edges.collect_actors_to_create(
1760 replace_table.new_fragments.actors_to_create().map(
1761 |(fragment_id, node, actors)| {
1762 (
1763 fragment_id,
1764 node,
1765 actors,
1766 database_info
1767 .job_subscribers(replace_table.old_fragments.stream_job_id),
1768 )
1769 },
1770 ),
1771 );
1772 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1773 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1774 (
1775 sink.new_fragment.fragment_id,
1776 &sink.new_fragment.nodes,
1777 sink.new_fragment.actors.iter().map(|actor| {
1778 (
1779 actor,
1780 sink.actor_status[&actor.actor_id]
1781 .location
1782 .as_ref()
1783 .unwrap()
1784 .worker_node_id,
1785 )
1786 }),
1787 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1788 )
1789 }));
1790 for (worker_id, fragment_actors) in sink_actors {
1791 actors.entry(worker_id).or_default().extend(fragment_actors);
1792 }
1793 }
1794 Some(actors)
1795 }
1796 _ => None,
1797 }
1798 }
1799
1800 fn generate_update_mutation_for_replace_table(
1801 dropped_actors: impl IntoIterator<Item = ActorId>,
1802 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1803 dispatchers: FragmentActorDispatchers,
1804 init_split_assignment: &SplitAssignment,
1805 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1806 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1807 ) -> Option<Mutation> {
1808 let dropped_actors = dropped_actors.into_iter().collect();
1809
1810 let actor_new_dispatchers = dispatchers
1811 .into_values()
1812 .flatten()
1813 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1814 .collect();
1815
1816 let actor_splits = init_split_assignment
1817 .values()
1818 .flat_map(build_actor_connector_splits)
1819 .collect();
1820 Some(Mutation::Update(UpdateMutation {
1821 actor_new_dispatchers,
1822 merge_update: merge_updates.into_values().flatten().collect(),
1823 dropped_actors,
1824 actor_splits,
1825 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1826 sink_schema_change: auto_refresh_schema_sinks
1827 .as_ref()
1828 .into_iter()
1829 .flat_map(|sinks| {
1830 sinks.iter().map(|sink| {
1831 (
1832 sink.original_sink.id.as_raw_id(),
1833 PbSinkSchemaChange {
1834 original_schema: sink
1835 .original_sink
1836 .columns
1837 .iter()
1838 .map(|col| PbField {
1839 data_type: Some(
1840 col.column_desc
1841 .as_ref()
1842 .unwrap()
1843 .column_type
1844 .as_ref()
1845 .unwrap()
1846 .clone(),
1847 ),
1848 name: col.column_desc.as_ref().unwrap().name.clone(),
1849 })
1850 .collect(),
1851 op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1852 fields: sink
1853 .newly_add_fields
1854 .iter()
1855 .map(|field| field.to_prost())
1856 .collect(),
1857 })),
1858 },
1859 )
1860 })
1861 })
1862 .collect(),
1863 ..Default::default()
1864 }))
1865 }
1866}
1867
1868impl Command {
1869 #[expect(clippy::type_complexity)]
1870 pub(super) fn collect_database_partial_graph_actor_upstreams(
1871 actor_dispatchers: impl Iterator<
1872 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1873 >,
1874 reschedule_dispatcher_update: Option<(
1875 &HashMap<FragmentId, Reschedule>,
1876 &HashMap<FragmentId, HashSet<ActorId>>,
1877 )>,
1878 database_info: &InflightDatabaseInfo,
1879 control_stream_manager: &ControlStreamManager,
1880 ) -> HashMap<ActorId, ActorUpstreams> {
1881 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1882 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1883 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1884 for (upstream_actor_id, dispatchers) in upstream_actors {
1885 let upstream_actor_location =
1886 upstream_fragment.actors[&upstream_actor_id].worker_id;
1887 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1888 for downstream_actor_id in dispatchers
1889 .iter()
1890 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1891 {
1892 actor_upstreams
1893 .entry(*downstream_actor_id)
1894 .or_default()
1895 .entry(upstream_fragment_id)
1896 .or_default()
1897 .insert(
1898 upstream_actor_id,
1899 PbActorInfo {
1900 actor_id: upstream_actor_id,
1901 host: Some(upstream_actor_host.clone()),
1902 partial_graph_id: to_partial_graph_id(
1903 database_info.database_id,
1904 None,
1905 ),
1906 },
1907 );
1908 }
1909 }
1910 }
1911 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1912 for reschedule in reschedules.values() {
1913 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1914 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1915 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1916 for upstream_actor_id in fragment_actors
1917 .get(upstream_fragment_id)
1918 .expect("should exist")
1919 {
1920 let upstream_actor_location =
1921 upstream_fragment.actors[upstream_actor_id].worker_id;
1922 let upstream_actor_host =
1923 control_stream_manager.host_addr(upstream_actor_location);
1924 if let Some(upstream_reschedule) = upstream_reschedule
1925 && upstream_reschedule
1926 .removed_actors
1927 .contains(upstream_actor_id)
1928 {
1929 continue;
1930 }
1931 for (_, downstream_actor_id) in
1932 reschedule
1933 .added_actors
1934 .iter()
1935 .flat_map(|(worker_id, actors)| {
1936 actors.iter().map(|actor| (*worker_id, *actor))
1937 })
1938 {
1939 actor_upstreams
1940 .entry(downstream_actor_id)
1941 .or_default()
1942 .entry(*upstream_fragment_id)
1943 .or_default()
1944 .insert(
1945 *upstream_actor_id,
1946 PbActorInfo {
1947 actor_id: *upstream_actor_id,
1948 host: Some(upstream_actor_host.clone()),
1949 partial_graph_id: to_partial_graph_id(
1950 database_info.database_id,
1951 None,
1952 ),
1953 },
1954 );
1955 }
1956 }
1957 }
1958 }
1959 }
1960 actor_upstreams
1961 }
1962}