1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::ActorMapping;
23use risingwave_common::must_match;
24use risingwave_common::types::Timestamptz;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::SplitImpl;
28use risingwave_connector::source::cdc::{
29 CdcTableSnapshotSplitAssignment, build_pb_actor_cdc_table_snapshot_splits,
30};
31use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
32use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
33use risingwave_meta_model::WorkerId;
34use risingwave_pb::catalog::CreateType;
35use risingwave_pb::common::ActorInfo;
36use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
37use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
38use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
39use risingwave_pb::stream_plan::barrier_mutation::Mutation;
40use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
43use risingwave_pb::stream_plan::update_mutation::*;
44use risingwave_pb::stream_plan::{
45 AddMutation, BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatcher,
46 Dispatchers, DropSubscriptionsMutation, LoadFinishMutation, PauseMutation, ResumeMutation,
47 SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
48 SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
49};
50use risingwave_pb::stream_service::BarrierCompleteResponse;
51use tracing::warn;
52
53use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
54use crate::barrier::InflightSubscriptionInfo;
55use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
56use crate::barrier::edge_builder::FragmentEdgeBuildResult;
57use crate::barrier::info::BarrierInfo;
58use crate::barrier::rpc::ControlStreamManager;
59use crate::barrier::utils::collect_resp_info;
60use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
61use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
62use crate::manager::{StreamingJob, StreamingJobType};
63use crate::model::{
64 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
65 FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
66 StreamJobFragments, StreamJobFragmentsToCreate,
67};
68use crate::stream::{
69 AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder,
70 JobReschedulePostUpdates, SplitAssignment, ThrottleConfig, build_actor_connector_splits,
71};
72
73#[derive(Debug, Clone)]
76pub struct Reschedule {
77 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
79
80 pub removed_actors: HashSet<ActorId>,
82
83 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
85
86 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
88 pub upstream_dispatcher_mapping: Option<ActorMapping>,
93
94 pub downstream_fragment_ids: Vec<FragmentId>,
96
97 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
101
102 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
103
104 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
105}
106
107#[derive(Debug, Clone)]
114pub struct ReplaceStreamJobPlan {
115 pub old_fragments: StreamJobFragments,
116 pub new_fragments: StreamJobFragmentsToCreate,
117 pub replace_upstream: FragmentReplaceUpstream,
120 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
121 pub init_split_assignment: SplitAssignment,
127 pub streaming_job: StreamingJob,
129 pub tmp_id: u32,
131 pub to_drop_state_table_ids: Vec<TableId>,
133 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
134 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
135}
136
137impl ReplaceStreamJobPlan {
138 fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
139 let mut fragment_changes = HashMap::new();
140 for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
141 let fragment_change = CommandFragmentChanges::NewFragment {
142 job_id: self.streaming_job.id().into(),
143 info: new_fragment,
144 is_existing: false,
145 };
146 fragment_changes
147 .try_insert(fragment_id, fragment_change)
148 .expect("non-duplicate");
149 }
150 for fragment in self.old_fragments.fragments.values() {
151 fragment_changes
152 .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
153 .expect("non-duplicate");
154 }
155 for (fragment_id, replace_map) in &self.replace_upstream {
156 fragment_changes
157 .try_insert(
158 *fragment_id,
159 CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
160 )
161 .expect("non-duplicate");
162 }
163 if let Some(sinks) = &self.auto_refresh_schema_sinks {
164 for sink in sinks {
165 let fragment_change = CommandFragmentChanges::NewFragment {
166 job_id: TableId::new(sink.original_sink.id as _),
167 info: sink.new_fragment_info(),
168 is_existing: false,
169 };
170 fragment_changes
171 .try_insert(sink.new_fragment.fragment_id, fragment_change)
172 .expect("non-duplicate");
173 fragment_changes
174 .try_insert(
175 sink.original_fragment.fragment_id,
176 CommandFragmentChanges::RemoveFragment,
177 )
178 .expect("non-duplicate");
179 }
180 }
181 fragment_changes
182 }
183
184 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
186 let mut fragment_replacements = HashMap::new();
187 for (upstream_fragment_id, new_upstream_fragment_id) in
188 self.replace_upstream.values().flatten()
189 {
190 {
191 let r =
192 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
193 if let Some(r) = r {
194 assert_eq!(
195 *new_upstream_fragment_id, r,
196 "one fragment is replaced by multiple fragments"
197 );
198 }
199 }
200 }
201 fragment_replacements
202 }
203}
204
205#[derive(educe::Educe, Clone)]
206#[educe(Debug)]
207pub struct CreateStreamingJobCommandInfo {
208 #[educe(Debug(ignore))]
209 pub stream_job_fragments: StreamJobFragmentsToCreate,
210 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
211 pub init_split_assignment: SplitAssignment,
212 pub definition: String,
213 pub job_type: StreamingJobType,
214 pub create_type: CreateType,
215 pub streaming_job: StreamingJob,
216 pub fragment_backfill_ordering: FragmentBackfillOrder,
217 pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
218}
219
220impl StreamJobFragments {
221 pub(super) fn new_fragment_info(
222 &self,
223 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
224 self.fragments.values().map(|fragment| {
225 (
226 fragment.fragment_id,
227 InflightFragmentInfo {
228 fragment_id: fragment.fragment_id,
229 distribution_type: fragment.distribution_type.into(),
230 nodes: fragment.nodes.clone(),
231 actors: fragment
232 .actors
233 .iter()
234 .map(|actor| {
235 (
236 actor.actor_id,
237 InflightActorInfo {
238 worker_id: self
239 .actor_status
240 .get(&actor.actor_id)
241 .expect("should exist")
242 .worker_id()
243 as WorkerId,
244 vnode_bitmap: actor.vnode_bitmap.clone(),
245 },
246 )
247 })
248 .collect(),
249 state_table_ids: fragment
250 .state_table_ids
251 .iter()
252 .map(|table_id| TableId::new(*table_id))
253 .collect(),
254 },
255 )
256 })
257 }
258}
259
260#[derive(Debug, Clone)]
261pub struct SnapshotBackfillInfo {
262 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
266}
267
268#[derive(Debug, Clone)]
269pub enum CreateStreamingJobType {
270 Normal,
271 SinkIntoTable(ReplaceStreamJobPlan),
272 SnapshotBackfill(SnapshotBackfillInfo),
273}
274
275#[derive(Debug)]
280pub enum Command {
281 Flush,
284
285 Pause,
288
289 Resume,
293
294 DropStreamingJobs {
302 table_fragments_ids: HashSet<TableId>,
303 actors: Vec<ActorId>,
304 unregistered_state_table_ids: HashSet<TableId>,
305 unregistered_fragment_ids: HashSet<FragmentId>,
306 },
307
308 CreateStreamingJob {
318 info: CreateStreamingJobCommandInfo,
319 job_type: CreateStreamingJobType,
320 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
321 },
322 MergeSnapshotBackfillStreamingJobs(
323 HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
324 ),
325
326 RescheduleFragment {
332 reschedules: HashMap<FragmentId, Reschedule>,
333 fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
335 post_updates: JobReschedulePostUpdates,
337 },
338
339 ReplaceStreamJob(ReplaceStreamJobPlan),
346
347 SourceChangeSplit(SplitAssignment),
350
351 Throttle(ThrottleConfig),
354
355 CreateSubscription {
358 subscription_id: u32,
359 upstream_mv_table_id: TableId,
360 retention_second: u64,
361 },
362
363 DropSubscription {
367 subscription_id: u32,
368 upstream_mv_table_id: TableId,
369 },
370
371 ConnectorPropsChange(ConnectorPropsChange),
372
373 StartFragmentBackfill {
375 fragment_ids: Vec<FragmentId>,
376 },
377
378 Refresh {
381 table_id: TableId,
382 associated_source_id: TableId,
383 },
384 LoadFinish {
385 table_id: TableId,
386 associated_source_id: TableId,
387 },
388}
389
390impl std::fmt::Display for Command {
392 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
393 match self {
394 Command::Flush => write!(f, "Flush"),
395 Command::Pause => write!(f, "Pause"),
396 Command::Resume => write!(f, "Resume"),
397 Command::DropStreamingJobs {
398 table_fragments_ids,
399 ..
400 } => {
401 write!(
402 f,
403 "DropStreamingJobs: {}",
404 table_fragments_ids.iter().sorted().join(", ")
405 )
406 }
407 Command::CreateStreamingJob { info, .. } => {
408 write!(f, "CreateStreamingJob: {}", info.streaming_job)
409 }
410 Command::MergeSnapshotBackfillStreamingJobs(_) => {
411 write!(f, "MergeSnapshotBackfillStreamingJobs")
412 }
413 Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
414 Command::ReplaceStreamJob(plan) => {
415 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
416 }
417 Command::SourceChangeSplit(_) => write!(f, "SourceChangeSplit"),
418 Command::Throttle(_) => write!(f, "Throttle"),
419 Command::CreateSubscription {
420 subscription_id, ..
421 } => write!(f, "CreateSubscription: {subscription_id}"),
422 Command::DropSubscription {
423 subscription_id, ..
424 } => write!(f, "DropSubscription: {subscription_id}"),
425 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
426 Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
427 Command::Refresh {
428 table_id,
429 associated_source_id,
430 } => write!(
431 f,
432 "Refresh: {} (source: {})",
433 table_id, associated_source_id
434 ),
435 Command::LoadFinish {
436 table_id,
437 associated_source_id,
438 } => write!(
439 f,
440 "LoadFinish: {} (source: {})",
441 table_id, associated_source_id
442 ),
443 }
444 }
445}
446
447impl Command {
448 pub fn pause() -> Self {
449 Self::Pause
450 }
451
452 pub fn resume() -> Self {
453 Self::Resume
454 }
455
456 pub fn cancel(table_fragments: &StreamJobFragments) -> Self {
457 Self::DropStreamingJobs {
458 table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
459 actors: table_fragments.actor_ids(),
460 unregistered_state_table_ids: table_fragments
461 .all_table_ids()
462 .map(TableId::new)
463 .collect(),
464 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
465 }
466 }
467
468 pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
469 match self {
470 Command::Flush => None,
471 Command::Pause => None,
472 Command::Resume => None,
473 Command::DropStreamingJobs {
474 unregistered_fragment_ids,
475 ..
476 } => Some(
477 unregistered_fragment_ids
478 .iter()
479 .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
480 .collect(),
481 ),
482 Command::CreateStreamingJob { info, job_type, .. } => {
483 assert!(
484 !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
485 "should handle fragment changes separately for snapshot backfill"
486 );
487 let mut changes: HashMap<_, _> = info
488 .stream_job_fragments
489 .new_fragment_info()
490 .map(|(fragment_id, fragment_info)| {
491 (
492 fragment_id,
493 CommandFragmentChanges::NewFragment {
494 job_id: info.streaming_job.id().into(),
495 info: fragment_info,
496 is_existing: false,
497 },
498 )
499 })
500 .collect();
501
502 if let CreateStreamingJobType::SinkIntoTable(plan) = job_type {
503 let extra_change = plan.fragment_changes();
504 changes.extend(extra_change);
505 }
506
507 Some(changes)
508 }
509 Command::RescheduleFragment { reschedules, .. } => Some(
510 reschedules
511 .iter()
512 .map(|(fragment_id, reschedule)| {
513 (
514 *fragment_id,
515 CommandFragmentChanges::Reschedule {
516 new_actors: reschedule
517 .added_actors
518 .iter()
519 .flat_map(|(node_id, actors)| {
520 actors.iter().map(|actor_id| {
521 (
522 *actor_id,
523 InflightActorInfo {
524 worker_id: *node_id,
525 vnode_bitmap: reschedule
526 .newly_created_actors
527 .get(actor_id)
528 .expect("should exist")
529 .0
530 .0
531 .vnode_bitmap
532 .clone(),
533 },
534 )
535 })
536 })
537 .collect(),
538 actor_update_vnode_bitmap: reschedule
539 .vnode_bitmap_updates
540 .iter()
541 .filter(|(actor_id, _)| {
542 !reschedule.newly_created_actors.contains_key(actor_id)
544 })
545 .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
546 .collect(),
547 to_remove: reschedule.removed_actors.iter().cloned().collect(),
548 },
549 )
550 })
551 .collect(),
552 ),
553 Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
554 Command::MergeSnapshotBackfillStreamingJobs(_) => None,
555 Command::SourceChangeSplit(_) => None,
556 Command::Throttle(_) => None,
557 Command::CreateSubscription { .. } => None,
558 Command::DropSubscription { .. } => None,
559 Command::ConnectorPropsChange(_) => None,
560 Command::StartFragmentBackfill { .. } => None,
561 Command::Refresh { .. } => None, Command::LoadFinish { .. } => None, }
564 }
565
566 pub fn need_checkpoint(&self) -> bool {
567 !matches!(self, Command::Resume)
569 }
570}
571
572#[derive(Debug, Clone)]
573pub enum BarrierKind {
574 Initial,
575 Barrier,
576 Checkpoint(Vec<u64>),
578}
579
580impl BarrierKind {
581 pub fn to_protobuf(&self) -> PbBarrierKind {
582 match self {
583 BarrierKind::Initial => PbBarrierKind::Initial,
584 BarrierKind::Barrier => PbBarrierKind::Barrier,
585 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
586 }
587 }
588
589 pub fn is_checkpoint(&self) -> bool {
590 matches!(self, BarrierKind::Checkpoint(_))
591 }
592
593 pub fn is_initial(&self) -> bool {
594 matches!(self, BarrierKind::Initial)
595 }
596
597 pub fn as_str_name(&self) -> &'static str {
598 match self {
599 BarrierKind::Initial => "Initial",
600 BarrierKind::Barrier => "Barrier",
601 BarrierKind::Checkpoint(_) => "Checkpoint",
602 }
603 }
604}
605
606pub(super) struct CommandContext {
609 subscription_info: InflightSubscriptionInfo,
610
611 pub(super) barrier_info: BarrierInfo,
612
613 pub(super) table_ids_to_commit: HashSet<TableId>,
614
615 pub(super) command: Option<Command>,
616
617 _span: tracing::Span,
623}
624
625impl std::fmt::Debug for CommandContext {
626 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
627 f.debug_struct("CommandContext")
628 .field("barrier_info", &self.barrier_info)
629 .field("command", &self.command)
630 .finish()
631 }
632}
633
634impl std::fmt::Display for CommandContext {
635 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
636 write!(
637 f,
638 "prev_epoch={}, curr_epoch={}, kind={}",
639 self.barrier_info.prev_epoch.value().0,
640 self.barrier_info.curr_epoch.value().0,
641 self.barrier_info.kind.as_str_name()
642 )?;
643 if let Some(command) = &self.command {
644 write!(f, ", command={}", command)?;
645 }
646 Ok(())
647 }
648}
649
650impl CommandContext {
651 pub(super) fn new(
652 barrier_info: BarrierInfo,
653 subscription_info: InflightSubscriptionInfo,
654 table_ids_to_commit: HashSet<TableId>,
655 command: Option<Command>,
656 span: tracing::Span,
657 ) -> Self {
658 Self {
659 subscription_info,
660 barrier_info,
661 table_ids_to_commit,
662 command,
663 _span: span,
664 }
665 }
666
667 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
668 let Some(truncate_timestamptz) = Timestamptz::from_secs(
669 self.barrier_info
670 .prev_epoch
671 .value()
672 .as_timestamptz()
673 .timestamp()
674 - retention_second as i64,
675 ) else {
676 warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
677 return self.barrier_info.prev_epoch.value();
678 };
679 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
680 }
681
682 pub(super) fn collect_commit_epoch_info(
683 &self,
684 info: &mut CommitEpochInfo,
685 resps: Vec<BarrierCompleteResponse>,
686 backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
687 ) {
688 let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts, vector_index_adds) =
689 collect_resp_info(resps);
690
691 let new_table_fragment_infos =
692 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
693 && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
694 {
695 let table_fragments = &info.stream_job_fragments;
696 let mut table_ids: HashSet<_> = table_fragments
697 .internal_table_ids()
698 .into_iter()
699 .map(TableId::new)
700 .collect();
701 if let Some(mv_table_id) = table_fragments.mv_table_id() {
702 table_ids.insert(TableId::new(mv_table_id));
703 }
704
705 vec![NewTableFragmentInfo { table_ids }]
706 } else {
707 vec![]
708 };
709
710 let mut mv_log_store_truncate_epoch = HashMap::new();
711 let mut update_truncate_epoch =
713 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
714 .entry(table_id.table_id)
715 {
716 Entry::Occupied(mut entry) => {
717 let prev_truncate_epoch = entry.get_mut();
718 if truncate_epoch < *prev_truncate_epoch {
719 *prev_truncate_epoch = truncate_epoch;
720 }
721 }
722 Entry::Vacant(entry) => {
723 entry.insert(truncate_epoch);
724 }
725 };
726 for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
727 if let Some(truncate_epoch) = subscriptions
728 .values()
729 .max()
730 .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
731 {
732 update_truncate_epoch(*mv_table_id, truncate_epoch);
733 }
734 }
735 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
736 for mv_table_id in upstream_mv_table_ids {
737 update_truncate_epoch(mv_table_id, backfill_epoch);
738 }
739 }
740
741 let table_new_change_log = build_table_change_log_delta(
742 old_value_ssts.into_iter(),
743 synced_ssts.iter().map(|sst| &sst.sst_info),
744 must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
745 mv_log_store_truncate_epoch.into_iter(),
746 );
747
748 let epoch = self.barrier_info.prev_epoch();
749 for table_id in &self.table_ids_to_commit {
750 info.tables_to_commit
751 .try_insert(*table_id, epoch)
752 .expect("non duplicate");
753 }
754
755 info.sstables.extend(synced_ssts);
756 info.new_table_watermarks.extend(new_table_watermarks);
757 info.sst_to_context.extend(sst_to_context);
758 info.new_table_fragment_infos
759 .extend(new_table_fragment_infos);
760 info.change_log_delta.extend(table_new_change_log);
761 for (table_id, vector_index_adds) in vector_index_adds {
762 info.vector_index_delta
763 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
764 .expect("non-duplicate");
765 }
766 if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
767 for fragment in job_info.stream_job_fragments.fragments.values() {
768 visit_stream_node_cont(&fragment.nodes, |node| {
769 match node.node_body.as_ref().unwrap() {
770 NodeBody::VectorIndexWrite(vector_index_write) => {
771 let index_table = vector_index_write.table.as_ref().unwrap();
772 info.vector_index_delta
773 .try_insert(
774 index_table.id.into(),
775 VectorIndexDelta::Init(PbVectorIndexInit {
776 info: Some(index_table.vector_index_info.unwrap()),
777 }),
778 )
779 .expect("non-duplicate");
780 false
781 }
782 _ => true,
783 }
784 })
785 }
786 }
787 }
788}
789
790impl Command {
791 pub(super) fn to_mutation(
795 &self,
796 is_currently_paused: bool,
797 edges: &mut Option<FragmentEdgeBuildResult>,
798 control_stream_manager: &ControlStreamManager,
799 ) -> Option<Mutation> {
800 match self {
801 Command::Flush => None,
802
803 Command::Pause => {
804 if !is_currently_paused {
807 Some(Mutation::Pause(PauseMutation {}))
808 } else {
809 None
810 }
811 }
812
813 Command::Resume => {
814 if is_currently_paused {
816 Some(Mutation::Resume(ResumeMutation {}))
817 } else {
818 None
819 }
820 }
821
822 Command::SourceChangeSplit(change) => {
823 let mut diff = HashMap::new();
824
825 for actor_splits in change.values() {
826 diff.extend(actor_splits.clone());
827 }
828
829 Some(Mutation::Splits(SourceChangeSplitMutation {
830 actor_splits: build_actor_connector_splits(&diff),
831 }))
832 }
833
834 Command::Throttle(config) => {
835 let mut actor_to_apply = HashMap::new();
836 for per_fragment in config.values() {
837 actor_to_apply.extend(
838 per_fragment
839 .iter()
840 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
841 );
842 }
843
844 Some(Mutation::Throttle(ThrottleMutation {
845 actor_throttle: actor_to_apply,
846 }))
847 }
848
849 Command::DropStreamingJobs { actors, .. } => Some(Mutation::Stop(StopMutation {
850 actors: actors.clone(),
851 })),
852
853 Command::CreateStreamingJob {
854 info:
855 CreateStreamingJobCommandInfo {
856 stream_job_fragments: table_fragments,
857 init_split_assignment: split_assignment,
858 upstream_fragment_downstreams,
859 fragment_backfill_ordering,
860 cdc_table_snapshot_split_assignment,
861 ..
862 },
863 job_type,
864 ..
865 } => {
866 let edges = edges.as_mut().expect("should exist");
867 let added_actors = table_fragments.actor_ids();
868 let actor_splits = split_assignment
869 .values()
870 .flat_map(build_actor_connector_splits)
871 .collect();
872 let subscriptions_to_add =
873 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
874 job_type
875 {
876 snapshot_backfill_info
877 .upstream_mv_table_id_to_backfill_epoch
878 .keys()
879 .map(|table_id| SubscriptionUpstreamInfo {
880 subscriber_id: table_fragments.stream_job_id().table_id,
881 upstream_mv_table_id: table_id.table_id,
882 })
883 .collect()
884 } else {
885 Default::default()
886 };
887 let backfill_nodes_to_pause: Vec<_> =
888 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
889 .into_iter()
890 .collect();
891 let add = Some(Mutation::Add(AddMutation {
892 actor_dispatchers: edges
893 .dispatchers
894 .extract_if(|fragment_id, _| {
895 upstream_fragment_downstreams.contains_key(fragment_id)
896 })
897 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
898 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
899 .collect(),
900 added_actors,
901 actor_splits,
902 pause: is_currently_paused,
904 subscriptions_to_add,
905 backfill_nodes_to_pause,
906 actor_cdc_table_snapshot_splits: build_pb_actor_cdc_table_snapshot_splits(
907 cdc_table_snapshot_split_assignment.clone(),
908 ),
909 }));
910
911 if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
912 old_fragments,
913 init_split_assignment,
914 replace_upstream,
915 upstream_fragment_downstreams,
916 cdc_table_snapshot_split_assignment,
917 ..
918 }) = job_type
919 {
920 let merge_updates = edges
921 .merge_updates
922 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
923 .collect();
924 let dispatchers = edges
925 .dispatchers
926 .extract_if(|fragment_id, _| {
927 upstream_fragment_downstreams.contains_key(fragment_id)
928 })
929 .collect();
930 let update = Self::generate_update_mutation_for_replace_table(
931 old_fragments.actor_ids(),
932 merge_updates,
933 dispatchers,
934 init_split_assignment,
935 cdc_table_snapshot_split_assignment,
936 );
937
938 Some(Mutation::Combined(CombinedMutation {
939 mutations: vec![
940 BarrierMutation { mutation: add },
941 BarrierMutation { mutation: update },
942 ],
943 }))
944 } else {
945 add
946 }
947 }
948 Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
949 Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
950 info: jobs_to_merge
951 .iter()
952 .flat_map(|(table_id, (backfill_upstream_tables, _))| {
953 backfill_upstream_tables
954 .iter()
955 .map(move |upstream_table_id| SubscriptionUpstreamInfo {
956 subscriber_id: table_id.table_id,
957 upstream_mv_table_id: upstream_table_id.table_id,
958 })
959 })
960 .collect(),
961 }))
962 }
963
964 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
965 old_fragments,
966 replace_upstream,
967 upstream_fragment_downstreams,
968 init_split_assignment,
969 auto_refresh_schema_sinks,
970 cdc_table_snapshot_split_assignment,
971 ..
972 }) => {
973 let edges = edges.as_mut().expect("should exist");
974 let merge_updates = edges
975 .merge_updates
976 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
977 .collect();
978 let dispatchers = edges
979 .dispatchers
980 .extract_if(|fragment_id, _| {
981 upstream_fragment_downstreams.contains_key(fragment_id)
982 })
983 .collect();
984 Self::generate_update_mutation_for_replace_table(
985 old_fragments.actor_ids().into_iter().chain(
986 auto_refresh_schema_sinks
987 .as_ref()
988 .into_iter()
989 .flat_map(|sinks| {
990 sinks.iter().flat_map(|sink| {
991 sink.original_fragment
992 .actors
993 .iter()
994 .map(|actor| actor.actor_id)
995 })
996 }),
997 ),
998 merge_updates,
999 dispatchers,
1000 init_split_assignment,
1001 cdc_table_snapshot_split_assignment,
1002 )
1003 }
1004
1005 Command::RescheduleFragment {
1006 reschedules,
1007 fragment_actors,
1008 ..
1009 } => {
1010 let mut dispatcher_update = HashMap::new();
1011 for reschedule in reschedules.values() {
1012 for &(upstream_fragment_id, dispatcher_id) in
1013 &reschedule.upstream_fragment_dispatcher_ids
1014 {
1015 let upstream_actor_ids = fragment_actors
1017 .get(&upstream_fragment_id)
1018 .expect("should contain");
1019
1020 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1021
1022 for &actor_id in upstream_actor_ids {
1024 let added_downstream_actor_id = if upstream_reschedule
1025 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1026 .unwrap_or(true)
1027 {
1028 reschedule
1029 .added_actors
1030 .values()
1031 .flatten()
1032 .cloned()
1033 .collect()
1034 } else {
1035 Default::default()
1036 };
1037 dispatcher_update
1039 .try_insert(
1040 (actor_id, dispatcher_id),
1041 DispatcherUpdate {
1042 actor_id,
1043 dispatcher_id,
1044 hash_mapping: reschedule
1045 .upstream_dispatcher_mapping
1046 .as_ref()
1047 .map(|m| m.to_protobuf()),
1048 added_downstream_actor_id,
1049 removed_downstream_actor_id: reschedule
1050 .removed_actors
1051 .iter()
1052 .cloned()
1053 .collect(),
1054 },
1055 )
1056 .unwrap();
1057 }
1058 }
1059 }
1060 let dispatcher_update = dispatcher_update.into_values().collect();
1061
1062 let mut merge_update = HashMap::new();
1063 for (&fragment_id, reschedule) in reschedules {
1064 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1065 let downstream_actor_ids = fragment_actors
1067 .get(&downstream_fragment_id)
1068 .expect("should contain");
1069
1070 let downstream_removed_actors: HashSet<_> = reschedules
1074 .get(&downstream_fragment_id)
1075 .map(|downstream_reschedule| {
1076 downstream_reschedule
1077 .removed_actors
1078 .iter()
1079 .copied()
1080 .collect()
1081 })
1082 .unwrap_or_default();
1083
1084 for &actor_id in downstream_actor_ids {
1086 if downstream_removed_actors.contains(&actor_id) {
1087 continue;
1088 }
1089
1090 merge_update
1092 .try_insert(
1093 (actor_id, fragment_id),
1094 MergeUpdate {
1095 actor_id,
1096 upstream_fragment_id: fragment_id,
1097 new_upstream_fragment_id: None,
1098 added_upstream_actors: reschedule
1099 .added_actors
1100 .iter()
1101 .flat_map(|(worker_id, actors)| {
1102 let host =
1103 control_stream_manager.host_addr(*worker_id);
1104 actors.iter().map(move |actor_id| ActorInfo {
1105 actor_id: *actor_id,
1106 host: Some(host.clone()),
1107 })
1108 })
1109 .collect(),
1110 removed_upstream_actor_id: reschedule
1111 .removed_actors
1112 .iter()
1113 .cloned()
1114 .collect(),
1115 },
1116 )
1117 .unwrap();
1118 }
1119 }
1120 }
1121 let merge_update = merge_update.into_values().collect();
1122
1123 let mut actor_vnode_bitmap_update = HashMap::new();
1124 for reschedule in reschedules.values() {
1125 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1127 let bitmap = bitmap.to_protobuf();
1128 actor_vnode_bitmap_update
1129 .try_insert(actor_id, bitmap)
1130 .unwrap();
1131 }
1132 }
1133 let dropped_actors = reschedules
1134 .values()
1135 .flat_map(|r| r.removed_actors.iter().copied())
1136 .collect();
1137 let mut actor_splits = HashMap::new();
1138 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1139
1140 for reschedule in reschedules.values() {
1141 for (actor_id, splits) in &reschedule.actor_splits {
1142 actor_splits.insert(
1143 *actor_id as ActorId,
1144 ConnectorSplits {
1145 splits: splits.iter().map(ConnectorSplit::from).collect(),
1146 },
1147 );
1148 }
1149 actor_cdc_table_snapshot_splits.extend(
1150 build_pb_actor_cdc_table_snapshot_splits(
1151 reschedule.cdc_table_snapshot_split_assignment.clone(),
1152 ),
1153 );
1154 }
1155
1156 let actor_new_dispatchers = HashMap::new();
1158
1159 let mutation = Mutation::Update(UpdateMutation {
1160 dispatcher_update,
1161 merge_update,
1162 actor_vnode_bitmap_update,
1163 dropped_actors,
1164 actor_splits,
1165 actor_new_dispatchers,
1166 actor_cdc_table_snapshot_splits,
1167 });
1168 tracing::debug!("update mutation: {mutation:?}");
1169 Some(mutation)
1170 }
1171
1172 Command::CreateSubscription {
1173 upstream_mv_table_id,
1174 subscription_id,
1175 ..
1176 } => Some(Mutation::Add(AddMutation {
1177 actor_dispatchers: Default::default(),
1178 added_actors: vec![],
1179 actor_splits: Default::default(),
1180 pause: false,
1181 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1182 upstream_mv_table_id: upstream_mv_table_id.table_id,
1183 subscriber_id: *subscription_id,
1184 }],
1185 backfill_nodes_to_pause: vec![],
1186 actor_cdc_table_snapshot_splits: Default::default(),
1187 })),
1188 Command::DropSubscription {
1189 upstream_mv_table_id,
1190 subscription_id,
1191 } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1192 info: vec![SubscriptionUpstreamInfo {
1193 subscriber_id: *subscription_id,
1194 upstream_mv_table_id: upstream_mv_table_id.table_id,
1195 }],
1196 })),
1197 Command::ConnectorPropsChange(config) => {
1198 let mut connector_props_infos = HashMap::default();
1199 for (k, v) in config {
1200 connector_props_infos.insert(
1201 *k,
1202 ConnectorPropsInfo {
1203 connector_props_info: v.clone(),
1204 },
1205 );
1206 }
1207 Some(Mutation::ConnectorPropsChange(
1208 ConnectorPropsChangeMutation {
1209 connector_props_infos,
1210 },
1211 ))
1212 }
1213 Command::StartFragmentBackfill { fragment_ids } => Some(
1214 Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1215 fragment_ids: fragment_ids.clone(),
1216 }),
1217 ),
1218 Command::Refresh {
1219 table_id,
1220 associated_source_id,
1221 } => Some(Mutation::RefreshStart(
1222 risingwave_pb::stream_plan::RefreshStartMutation {
1223 table_id: table_id.table_id,
1224 associated_source_id: associated_source_id.table_id,
1225 },
1226 )),
1227 Command::LoadFinish {
1228 table_id: _,
1229 associated_source_id,
1230 } => Some(Mutation::LoadFinish(LoadFinishMutation {
1231 associated_source_id: associated_source_id.table_id,
1232 })),
1233 }
1234 }
1235
1236 pub(super) fn actors_to_create(
1237 &self,
1238 graph_info: &InflightDatabaseInfo,
1239 edges: &mut Option<FragmentEdgeBuildResult>,
1240 control_stream_manager: &ControlStreamManager,
1241 ) -> Option<StreamJobActorsToCreate> {
1242 match self {
1243 Command::CreateStreamingJob { info, job_type, .. } => {
1244 let sink_into_table_replace_plan = match job_type {
1245 CreateStreamingJobType::Normal => None,
1246 CreateStreamingJobType::SinkIntoTable(replace_table) => Some(replace_table),
1247 CreateStreamingJobType::SnapshotBackfill(_) => {
1248 return None;
1250 }
1251 };
1252 let get_actors_to_create = || {
1253 sink_into_table_replace_plan
1254 .map(|plan| plan.new_fragments.actors_to_create())
1255 .into_iter()
1256 .flatten()
1257 .chain(info.stream_job_fragments.actors_to_create())
1258 };
1259 let edges = edges.as_mut().expect("should exist");
1260 Some(edges.collect_actors_to_create(get_actors_to_create()))
1261 }
1262 Command::RescheduleFragment {
1263 reschedules,
1264 fragment_actors,
1265 ..
1266 } => {
1267 let mut actor_upstreams = Self::collect_actor_upstreams(
1268 reschedules.iter().map(|(fragment_id, reschedule)| {
1269 (
1270 *fragment_id,
1271 reschedule.newly_created_actors.values().map(
1272 |((actor, dispatchers), _)| {
1273 (actor.actor_id, dispatchers.as_slice())
1274 },
1275 ),
1276 )
1277 }),
1278 Some((reschedules, fragment_actors)),
1279 graph_info,
1280 control_stream_manager,
1281 );
1282 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1283 for (fragment_id, (actor, dispatchers), worker_id) in
1284 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1285 reschedule
1286 .newly_created_actors
1287 .values()
1288 .map(|(actors, status)| (*fragment_id, actors, status))
1289 })
1290 {
1291 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1292 map.entry(*worker_id)
1293 .or_default()
1294 .entry(fragment_id)
1295 .or_insert_with(|| {
1296 let node = graph_info.fragment(fragment_id).nodes.clone();
1297 (node, vec![])
1298 })
1299 .1
1300 .push((actor.clone(), upstreams, dispatchers.clone()));
1301 }
1302 Some(map)
1303 }
1304 Command::ReplaceStreamJob(replace_table) => {
1305 let edges = edges.as_mut().expect("should exist");
1306 let mut actors =
1307 edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create());
1308 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1309 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1310 (
1311 sink.new_fragment.fragment_id,
1312 &sink.new_fragment.nodes,
1313 sink.new_fragment.actors.iter().map(|actor| {
1314 (
1315 actor,
1316 sink.actor_status[&actor.actor_id]
1317 .location
1318 .as_ref()
1319 .unwrap()
1320 .worker_node_id as _,
1321 )
1322 }),
1323 )
1324 }));
1325 for (worker_id, fragment_actors) in sink_actors {
1326 actors.entry(worker_id).or_default().extend(fragment_actors);
1327 }
1328 }
1329 Some(actors)
1330 }
1331 _ => None,
1332 }
1333 }
1334
1335 fn generate_update_mutation_for_replace_table(
1336 dropped_actors: impl IntoIterator<Item = ActorId>,
1337 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1338 dispatchers: FragmentActorDispatchers,
1339 init_split_assignment: &SplitAssignment,
1340 cdc_table_snapshot_split_assignment: &CdcTableSnapshotSplitAssignment,
1341 ) -> Option<Mutation> {
1342 let dropped_actors = dropped_actors.into_iter().collect();
1343
1344 let actor_new_dispatchers = dispatchers
1345 .into_values()
1346 .flatten()
1347 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1348 .collect();
1349
1350 let actor_splits = init_split_assignment
1351 .values()
1352 .flat_map(build_actor_connector_splits)
1353 .collect();
1354
1355 Some(Mutation::Update(UpdateMutation {
1356 actor_new_dispatchers,
1357 merge_update: merge_updates.into_values().flatten().collect(),
1358 dropped_actors,
1359 actor_splits,
1360 actor_cdc_table_snapshot_splits: build_pb_actor_cdc_table_snapshot_splits(
1361 cdc_table_snapshot_split_assignment.clone(),
1362 ),
1363 ..Default::default()
1364 }))
1365 }
1366
1367 pub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1369 match self {
1370 Command::DropStreamingJobs {
1371 table_fragments_ids,
1372 ..
1373 } => Some(table_fragments_ids.iter().cloned()),
1374 _ => None,
1375 }
1376 .into_iter()
1377 .flatten()
1378 }
1379}
1380
1381impl Command {
1382 #[expect(clippy::type_complexity)]
1383 pub(super) fn collect_actor_upstreams(
1384 actor_dispatchers: impl Iterator<
1385 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1386 >,
1387 reschedule_dispatcher_update: Option<(
1388 &HashMap<FragmentId, Reschedule>,
1389 &HashMap<FragmentId, HashSet<ActorId>>,
1390 )>,
1391 graph_info: &InflightDatabaseInfo,
1392 control_stream_manager: &ControlStreamManager,
1393 ) -> HashMap<ActorId, ActorUpstreams> {
1394 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1395 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1396 let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1397 for (upstream_actor_id, dispatchers) in upstream_actors {
1398 let upstream_actor_location =
1399 upstream_fragment.actors[&upstream_actor_id].worker_id;
1400 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1401 for downstream_actor_id in dispatchers
1402 .iter()
1403 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1404 {
1405 actor_upstreams
1406 .entry(*downstream_actor_id)
1407 .or_default()
1408 .entry(upstream_fragment_id)
1409 .or_default()
1410 .insert(
1411 upstream_actor_id,
1412 ActorInfo {
1413 actor_id: upstream_actor_id,
1414 host: Some(upstream_actor_host.clone()),
1415 },
1416 );
1417 }
1418 }
1419 }
1420 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1421 for reschedule in reschedules.values() {
1422 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1423 let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1424 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1425 for upstream_actor_id in fragment_actors
1426 .get(upstream_fragment_id)
1427 .expect("should exist")
1428 {
1429 let upstream_actor_location =
1430 upstream_fragment.actors[upstream_actor_id].worker_id;
1431 let upstream_actor_host =
1432 control_stream_manager.host_addr(upstream_actor_location);
1433 if let Some(upstream_reschedule) = upstream_reschedule
1434 && upstream_reschedule
1435 .removed_actors
1436 .contains(upstream_actor_id)
1437 {
1438 continue;
1439 }
1440 for (_, downstream_actor_id) in
1441 reschedule
1442 .added_actors
1443 .iter()
1444 .flat_map(|(worker_id, actors)| {
1445 actors.iter().map(|actor| (*worker_id, *actor))
1446 })
1447 {
1448 actor_upstreams
1449 .entry(downstream_actor_id)
1450 .or_default()
1451 .entry(*upstream_fragment_id)
1452 .or_default()
1453 .insert(
1454 *upstream_actor_id,
1455 ActorInfo {
1456 actor_id: *upstream_actor_id,
1457 host: Some(upstream_actor_host.clone()),
1458 },
1459 );
1460 }
1461 }
1462 }
1463 }
1464 }
1465 actor_upstreams
1466 }
1467}