risingwave_meta/barrier/
command.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18use std::iter;
19
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::TableId;
23use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::SplitImpl;
29use risingwave_connector::source::cdc::{
30    CdcTableSnapshotSplitAssignment, CdcTableSnapshotSplitAssignmentWithGeneration,
31    build_pb_actor_cdc_table_snapshot_splits,
32    build_pb_actor_cdc_table_snapshot_splits_with_generation,
33};
34use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
35use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::catalog::CreateType;
38use risingwave_pb::catalog::table::PbTableType;
39use risingwave_pb::common::PbActorInfo;
40use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
41use risingwave_pb::source::{
42    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
43};
44use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
45use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
46use risingwave_pb::stream_plan::barrier_mutation::Mutation;
47use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
48use risingwave_pb::stream_plan::stream_node::NodeBody;
49use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
50use risingwave_pb::stream_plan::update_mutation::*;
51use risingwave_pb::stream_plan::{
52    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
53    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumns, PbUpstreamSinkInfo,
54    ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
55    SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
56};
57use risingwave_pb::stream_service::BarrierCompleteResponse;
58use tracing::warn;
59
60use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
61use crate::barrier::InflightSubscriptionInfo;
62use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
63use crate::barrier::edge_builder::FragmentEdgeBuildResult;
64use crate::barrier::info::BarrierInfo;
65use crate::barrier::rpc::ControlStreamManager;
66use crate::barrier::utils::collect_resp_info;
67use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
68use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
69use crate::manager::{StreamingJob, StreamingJobType};
70use crate::model::{
71    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
72    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
73    StreamJobFragments, StreamJobFragmentsToCreate,
74};
75use crate::stream::{
76    AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder,
77    JobReschedulePostUpdates, SplitAssignment, SplitState, ThrottleConfig, UpstreamSinkInfo,
78    build_actor_connector_splits,
79};
80
81/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
82/// in some fragment, like scaling or migrating.
83#[derive(Debug, Clone)]
84pub struct Reschedule {
85    /// Added actors in this fragment.
86    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
87
88    /// Removed actors in this fragment.
89    pub removed_actors: HashSet<ActorId>,
90
91    /// Vnode bitmap updates for some actors in this fragment.
92    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
93
94    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
95    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
96    /// New hash mapping of the upstream dispatcher to be updated.
97    ///
98    /// This field exists only when there's upstream fragment and the current fragment is
99    /// hash-sharded.
100    pub upstream_dispatcher_mapping: Option<ActorMapping>,
101
102    /// The downstream fragments of this fragment.
103    pub downstream_fragment_ids: Vec<FragmentId>,
104
105    /// Reassigned splits for source actors.
106    /// It becomes the `actor_splits` in [`UpdateMutation`].
107    /// `Source` and `SourceBackfill` are handled together here.
108    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
109
110    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
111
112    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
113
114    pub cdc_table_id: Option<u32>,
115}
116
117/// Replacing an old job with a new one. All actors in the job will be rebuilt.
118///
119/// Current use cases:
120/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
121/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
122///   will replace a table job's plan.
123#[derive(Debug, Clone)]
124pub struct ReplaceStreamJobPlan {
125    pub old_fragments: StreamJobFragments,
126    pub new_fragments: StreamJobFragmentsToCreate,
127    /// Downstream jobs of the replaced job need to update their `Merge` node to
128    /// connect to the new fragment.
129    pub replace_upstream: FragmentReplaceUpstream,
130    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
131    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
132    /// We need to reassign splits for it.
133    ///
134    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
135    /// `backfill_splits`.
136    pub init_split_assignment: SplitAssignment,
137    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
138    pub streaming_job: StreamingJob,
139    /// The temporary dummy job fragments id of new table fragment
140    pub tmp_id: u32,
141    /// The state table ids to be dropped.
142    pub to_drop_state_table_ids: Vec<TableId>,
143    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
144    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
145}
146
147impl ReplaceStreamJobPlan {
148    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
149        let mut fragment_changes = HashMap::new();
150        for (fragment_id, new_fragment) in self
151            .new_fragments
152            .new_fragment_info(&self.init_split_assignment)
153        {
154            let fragment_change = CommandFragmentChanges::NewFragment {
155                job_id: self.streaming_job.id().into(),
156                info: new_fragment,
157                is_existing: false,
158            };
159            fragment_changes
160                .try_insert(fragment_id, fragment_change)
161                .expect("non-duplicate");
162        }
163        for fragment in self.old_fragments.fragments.values() {
164            fragment_changes
165                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
166                .expect("non-duplicate");
167        }
168        for (fragment_id, replace_map) in &self.replace_upstream {
169            fragment_changes
170                .try_insert(
171                    *fragment_id,
172                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
173                )
174                .expect("non-duplicate");
175        }
176        if let Some(sinks) = &self.auto_refresh_schema_sinks {
177            for sink in sinks {
178                let fragment_change = CommandFragmentChanges::NewFragment {
179                    job_id: TableId::new(sink.original_sink.id as _),
180                    info: sink.new_fragment_info(),
181                    is_existing: false,
182                };
183                fragment_changes
184                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
185                    .expect("non-duplicate");
186                fragment_changes
187                    .try_insert(
188                        sink.original_fragment.fragment_id,
189                        CommandFragmentChanges::RemoveFragment,
190                    )
191                    .expect("non-duplicate");
192            }
193        }
194        fragment_changes
195    }
196
197    /// `old_fragment_id` -> `new_fragment_id`
198    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
199        let mut fragment_replacements = HashMap::new();
200        for (upstream_fragment_id, new_upstream_fragment_id) in
201            self.replace_upstream.values().flatten()
202        {
203            {
204                let r =
205                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
206                if let Some(r) = r {
207                    assert_eq!(
208                        *new_upstream_fragment_id, r,
209                        "one fragment is replaced by multiple fragments"
210                    );
211                }
212            }
213        }
214        fragment_replacements
215    }
216}
217
218#[derive(educe::Educe, Clone)]
219#[educe(Debug)]
220pub struct CreateStreamingJobCommandInfo {
221    #[educe(Debug(ignore))]
222    pub stream_job_fragments: StreamJobFragmentsToCreate,
223    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
224    pub init_split_assignment: SplitAssignment,
225    pub definition: String,
226    pub job_type: StreamingJobType,
227    pub create_type: CreateType,
228    pub streaming_job: StreamingJob,
229    pub fragment_backfill_ordering: FragmentBackfillOrder,
230    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
231    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
232}
233
234impl StreamJobFragments {
235    pub(super) fn new_fragment_info<'a>(
236        &'a self,
237        assignment: &'a SplitAssignment,
238    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
239        self.fragments.values().map(|fragment| {
240            let mut fragment_splits = assignment
241                .get(&fragment.fragment_id)
242                .cloned()
243                .unwrap_or_default();
244
245            (
246                fragment.fragment_id,
247                InflightFragmentInfo {
248                    fragment_id: fragment.fragment_id,
249                    distribution_type: fragment.distribution_type.into(),
250                    fragment_type_mask: fragment.fragment_type_mask,
251                    vnode_count: fragment.vnode_count(),
252                    nodes: fragment.nodes.clone(),
253                    actors: fragment
254                        .actors
255                        .iter()
256                        .map(|actor| {
257                            (
258                                actor.actor_id,
259                                InflightActorInfo {
260                                    worker_id: self
261                                        .actor_status
262                                        .get(&actor.actor_id)
263                                        .expect("should exist")
264                                        .worker_id()
265                                        as WorkerId,
266                                    vnode_bitmap: actor.vnode_bitmap.clone(),
267                                    splits: fragment_splits
268                                        .remove(&actor.actor_id)
269                                        .unwrap_or_default(),
270                                },
271                            )
272                        })
273                        .collect(),
274                    state_table_ids: fragment
275                        .state_table_ids
276                        .iter()
277                        .map(|table_id| TableId::new(*table_id))
278                        .collect(),
279                },
280            )
281        })
282    }
283}
284
285#[derive(Debug, Clone)]
286pub struct SnapshotBackfillInfo {
287    /// `table_id` -> `Some(snapshot_backfill_epoch)`
288    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
289    /// by global barrier worker when handling the command.
290    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
291}
292
293#[derive(Debug, Clone)]
294pub enum CreateStreamingJobType {
295    Normal,
296    SinkIntoTable(UpstreamSinkInfo),
297    SnapshotBackfill(SnapshotBackfillInfo),
298}
299
300/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
301/// it will [build different barriers to send](Self::to_mutation),
302/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
303// FIXME: this enum is significantly large on stack, box it
304#[derive(Debug)]
305pub enum Command {
306    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
307    /// all messages before the checkpoint barrier should have been committed.
308    Flush,
309
310    /// `Pause` command generates a `Pause` barrier **only if**
311    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
312    Pause,
313
314    /// `Resume` command generates a `Resume` barrier **only
315    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
316    /// will be generated.
317    Resume,
318
319    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
320    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
321    /// dropped by reference counts before.
322    ///
323    /// Barriers from the actors to be dropped will STILL be collected.
324    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
325    /// drop actors, and then delete the job fragments info from meta store.
326    DropStreamingJobs {
327        streaming_job_ids: HashSet<TableId>,
328        actors: Vec<ActorId>,
329        unregistered_state_table_ids: HashSet<TableId>,
330        unregistered_fragment_ids: HashSet<FragmentId>,
331        // target_fragment -> [sink_fragments]
332        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
333    },
334
335    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
336    ///
337    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
338    /// be collected since the barrier should be passthrough.
339    ///
340    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
341    /// it adds the job fragments info to meta store. However, the creating progress will **last
342    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
343    /// will be set to `Created`.
344    CreateStreamingJob {
345        info: CreateStreamingJobCommandInfo,
346        job_type: CreateStreamingJobType,
347        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
348    },
349    MergeSnapshotBackfillStreamingJobs(
350        HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
351    ),
352
353    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
354    /// Mainly used for scaling and migration.
355    ///
356    /// Barriers from which actors should be collected, and the post behavior of this command are
357    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
358    RescheduleFragment {
359        reschedules: HashMap<FragmentId, Reschedule>,
360        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
361        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
362        // Used for updating additional metadata after the barrier ends
363        post_updates: JobReschedulePostUpdates,
364    },
365
366    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
367    /// essentially switching the downstream of the old job fragments to the new ones, and
368    /// dropping the old job fragments. Used for schema change.
369    ///
370    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
371    /// of the Merge executors are changed additionally.
372    ReplaceStreamJob(ReplaceStreamJobPlan),
373
374    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
375    /// changed splits.
376    SourceChangeSplit(SplitState),
377
378    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
379    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
380    Throttle(ThrottleConfig),
381
382    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
383    /// materialize executor to start storing old value for subscription.
384    CreateSubscription {
385        subscription_id: u32,
386        upstream_mv_table_id: TableId,
387        retention_second: u64,
388    },
389
390    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
391    /// materialize executor to stop storing old value when there is no
392    /// subscription depending on it.
393    DropSubscription {
394        subscription_id: u32,
395        upstream_mv_table_id: TableId,
396    },
397
398    ConnectorPropsChange(ConnectorPropsChange),
399
400    /// `StartFragmentBackfill` command will trigger backfilling for specified scans by `fragment_id`.
401    StartFragmentBackfill {
402        fragment_ids: Vec<FragmentId>,
403    },
404
405    /// `Refresh` command generates a barrier to refresh a table by truncating state
406    /// and reloading data from source.
407    Refresh {
408        table_id: TableId,
409        associated_source_id: TableId,
410    },
411    ListFinish {
412        table_id: TableId,
413        associated_source_id: TableId,
414    },
415    LoadFinish {
416        table_id: TableId,
417        associated_source_id: TableId,
418    },
419}
420
421// For debugging and observability purposes. Can add more details later if needed.
422impl std::fmt::Display for Command {
423    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
424        match self {
425            Command::Flush => write!(f, "Flush"),
426            Command::Pause => write!(f, "Pause"),
427            Command::Resume => write!(f, "Resume"),
428            Command::DropStreamingJobs {
429                streaming_job_ids, ..
430            } => {
431                write!(
432                    f,
433                    "DropStreamingJobs: {}",
434                    streaming_job_ids.iter().sorted().join(", ")
435                )
436            }
437            Command::CreateStreamingJob { info, .. } => {
438                write!(f, "CreateStreamingJob: {}", info.streaming_job)
439            }
440            Command::MergeSnapshotBackfillStreamingJobs(_) => {
441                write!(f, "MergeSnapshotBackfillStreamingJobs")
442            }
443            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
444            Command::ReplaceStreamJob(plan) => {
445                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
446            }
447            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
448            Command::Throttle(_) => write!(f, "Throttle"),
449            Command::CreateSubscription {
450                subscription_id, ..
451            } => write!(f, "CreateSubscription: {subscription_id}"),
452            Command::DropSubscription {
453                subscription_id, ..
454            } => write!(f, "DropSubscription: {subscription_id}"),
455            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
456            Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
457            Command::Refresh {
458                table_id,
459                associated_source_id,
460            } => write!(
461                f,
462                "Refresh: {} (source: {})",
463                table_id, associated_source_id
464            ),
465            Command::ListFinish {
466                table_id,
467                associated_source_id,
468            } => write!(
469                f,
470                "ListFinish: {} (source: {})",
471                table_id, associated_source_id
472            ),
473            Command::LoadFinish {
474                table_id,
475                associated_source_id,
476            } => write!(
477                f,
478                "LoadFinish: {} (source: {})",
479                table_id, associated_source_id
480            ),
481        }
482    }
483}
484
485impl Command {
486    pub fn pause() -> Self {
487        Self::Pause
488    }
489
490    pub fn resume() -> Self {
491        Self::Resume
492    }
493
494    pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
495        match self {
496            Command::Flush => None,
497            Command::Pause => None,
498            Command::Resume => None,
499            Command::DropStreamingJobs {
500                unregistered_fragment_ids,
501                dropped_sink_fragment_by_targets,
502                ..
503            } => {
504                let changes = unregistered_fragment_ids
505                    .iter()
506                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
507                    .chain(dropped_sink_fragment_by_targets.iter().map(
508                        |(target_fragment, sink_fragments)| {
509                            (
510                                *target_fragment,
511                                CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
512                            )
513                        },
514                    ))
515                    .collect();
516
517                Some(changes)
518            }
519            Command::CreateStreamingJob { info, job_type, .. } => {
520                assert!(
521                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
522                    "should handle fragment changes separately for snapshot backfill"
523                );
524                let mut changes: HashMap<_, _> = info
525                    .stream_job_fragments
526                    .new_fragment_info(&info.init_split_assignment)
527                    .map(|(fragment_id, fragment_info)| {
528                        (
529                            fragment_id,
530                            CommandFragmentChanges::NewFragment {
531                                job_id: info.streaming_job.id().into(),
532                                info: fragment_info,
533                                is_existing: false,
534                            },
535                        )
536                    })
537                    .collect();
538
539                if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
540                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
541                    changes.insert(
542                        downstream_fragment_id,
543                        CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
544                            upstream_fragment_id: ctx.sink_fragment_id,
545                            sink_output_schema: ctx.sink_output_fields.clone(),
546                            project_exprs: ctx.project_exprs.clone(),
547                        }),
548                    );
549                }
550
551                Some(changes)
552            }
553            Command::RescheduleFragment { reschedules, .. } => Some(
554                reschedules
555                    .iter()
556                    .map(|(fragment_id, reschedule)| {
557                        (
558                            *fragment_id,
559                            CommandFragmentChanges::Reschedule {
560                                new_actors: reschedule
561                                    .added_actors
562                                    .iter()
563                                    .flat_map(|(node_id, actors)| {
564                                        actors.iter().map(|actor_id| {
565                                            (
566                                                *actor_id,
567                                                InflightActorInfo {
568                                                    worker_id: *node_id,
569                                                    vnode_bitmap: reschedule
570                                                        .newly_created_actors
571                                                        .get(actor_id)
572                                                        .expect("should exist")
573                                                        .0
574                                                        .0
575                                                        .vnode_bitmap
576                                                        .clone(),
577                                                    splits: reschedule
578                                                        .actor_splits
579                                                        .get(actor_id)
580                                                        .cloned()
581                                                        .unwrap_or_default(),
582                                                },
583                                            )
584                                        })
585                                    })
586                                    .collect(),
587                                actor_update_vnode_bitmap: reschedule
588                                    .vnode_bitmap_updates
589                                    .iter()
590                                    .filter(|(actor_id, _)| {
591                                        // only keep the existing actors
592                                        !reschedule.newly_created_actors.contains_key(actor_id)
593                                    })
594                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
595                                    .collect(),
596                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
597                                actor_splits: reschedule.actor_splits.clone(),
598                            },
599                        )
600                    })
601                    .collect(),
602            ),
603            Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
604            Command::MergeSnapshotBackfillStreamingJobs(_) => None,
605            Command::SourceChangeSplit(SplitState {
606                split_assignment, ..
607            }) => Some(
608                split_assignment
609                    .iter()
610                    .map(|(&fragment_id, splits)| {
611                        (
612                            fragment_id,
613                            CommandFragmentChanges::SplitAssignment {
614                                actor_splits: splits.clone(),
615                            },
616                        )
617                    })
618                    .collect(),
619            ),
620            Command::Throttle(_) => None,
621            Command::CreateSubscription { .. } => None,
622            Command::DropSubscription { .. } => None,
623            Command::ConnectorPropsChange(_) => None,
624            Command::StartFragmentBackfill { .. } => None,
625            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
626            Command::ListFinish { .. } => None, // ListFinish doesn't change fragment structure
627            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
628        }
629    }
630
631    pub fn need_checkpoint(&self) -> bool {
632        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
633        !matches!(self, Command::Resume)
634    }
635}
636
637#[derive(Debug, Clone)]
638pub enum BarrierKind {
639    Initial,
640    Barrier,
641    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
642    Checkpoint(Vec<u64>),
643}
644
645impl BarrierKind {
646    pub fn to_protobuf(&self) -> PbBarrierKind {
647        match self {
648            BarrierKind::Initial => PbBarrierKind::Initial,
649            BarrierKind::Barrier => PbBarrierKind::Barrier,
650            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
651        }
652    }
653
654    pub fn is_checkpoint(&self) -> bool {
655        matches!(self, BarrierKind::Checkpoint(_))
656    }
657
658    pub fn is_initial(&self) -> bool {
659        matches!(self, BarrierKind::Initial)
660    }
661
662    pub fn as_str_name(&self) -> &'static str {
663        match self {
664            BarrierKind::Initial => "Initial",
665            BarrierKind::Barrier => "Barrier",
666            BarrierKind::Checkpoint(_) => "Checkpoint",
667        }
668    }
669}
670
671/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
672/// [`Command`].
673pub(super) struct CommandContext {
674    subscription_info: InflightSubscriptionInfo,
675
676    pub(super) barrier_info: BarrierInfo,
677
678    pub(super) table_ids_to_commit: HashSet<TableId>,
679
680    pub(super) command: Option<Command>,
681
682    /// The tracing span of this command.
683    ///
684    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
685    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
686    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
687    _span: tracing::Span,
688}
689
690impl std::fmt::Debug for CommandContext {
691    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
692        f.debug_struct("CommandContext")
693            .field("barrier_info", &self.barrier_info)
694            .field("command", &self.command)
695            .finish()
696    }
697}
698
699impl std::fmt::Display for CommandContext {
700    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
701        write!(
702            f,
703            "prev_epoch={}, curr_epoch={}, kind={}",
704            self.barrier_info.prev_epoch.value().0,
705            self.barrier_info.curr_epoch.value().0,
706            self.barrier_info.kind.as_str_name()
707        )?;
708        if let Some(command) = &self.command {
709            write!(f, ", command={}", command)?;
710        }
711        Ok(())
712    }
713}
714
715impl CommandContext {
716    pub(super) fn new(
717        barrier_info: BarrierInfo,
718        subscription_info: InflightSubscriptionInfo,
719        table_ids_to_commit: HashSet<TableId>,
720        command: Option<Command>,
721        span: tracing::Span,
722    ) -> Self {
723        Self {
724            subscription_info,
725            barrier_info,
726            table_ids_to_commit,
727            command,
728            _span: span,
729        }
730    }
731
732    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
733        let Some(truncate_timestamptz) = Timestamptz::from_secs(
734            self.barrier_info
735                .prev_epoch
736                .value()
737                .as_timestamptz()
738                .timestamp()
739                - retention_second as i64,
740        ) else {
741            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
742            return self.barrier_info.prev_epoch.value();
743        };
744        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
745    }
746
747    pub(super) fn collect_commit_epoch_info(
748        &self,
749        info: &mut CommitEpochInfo,
750        resps: Vec<BarrierCompleteResponse>,
751        backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
752    ) {
753        let (
754            sst_to_context,
755            synced_ssts,
756            new_table_watermarks,
757            old_value_ssts,
758            vector_index_adds,
759            truncate_tables,
760        ) = collect_resp_info(resps);
761
762        let new_table_fragment_infos =
763            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
764                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
765            {
766                let table_fragments = &info.stream_job_fragments;
767                let mut table_ids: HashSet<_> = table_fragments
768                    .internal_table_ids()
769                    .into_iter()
770                    .map(TableId::new)
771                    .collect();
772                if let Some(mv_table_id) = table_fragments.mv_table_id() {
773                    table_ids.insert(TableId::new(mv_table_id));
774                }
775
776                vec![NewTableFragmentInfo { table_ids }]
777            } else {
778                vec![]
779            };
780
781        let mut mv_log_store_truncate_epoch = HashMap::new();
782        // TODO: may collect cross db snapshot backfill
783        let mut update_truncate_epoch =
784            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
785                .entry(table_id.table_id)
786            {
787                Entry::Occupied(mut entry) => {
788                    let prev_truncate_epoch = entry.get_mut();
789                    if truncate_epoch < *prev_truncate_epoch {
790                        *prev_truncate_epoch = truncate_epoch;
791                    }
792                }
793                Entry::Vacant(entry) => {
794                    entry.insert(truncate_epoch);
795                }
796            };
797        for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
798            if let Some(truncate_epoch) = subscriptions
799                .values()
800                .max()
801                .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
802            {
803                update_truncate_epoch(*mv_table_id, truncate_epoch);
804            }
805        }
806        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
807            for mv_table_id in upstream_mv_table_ids {
808                update_truncate_epoch(mv_table_id, backfill_epoch);
809            }
810        }
811
812        let table_new_change_log = build_table_change_log_delta(
813            old_value_ssts.into_iter(),
814            synced_ssts.iter().map(|sst| &sst.sst_info),
815            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
816            mv_log_store_truncate_epoch.into_iter(),
817        );
818
819        let epoch = self.barrier_info.prev_epoch();
820        for table_id in &self.table_ids_to_commit {
821            info.tables_to_commit
822                .try_insert(*table_id, epoch)
823                .expect("non duplicate");
824        }
825
826        info.sstables.extend(synced_ssts);
827        info.new_table_watermarks.extend(new_table_watermarks);
828        info.sst_to_context.extend(sst_to_context);
829        info.new_table_fragment_infos
830            .extend(new_table_fragment_infos);
831        info.change_log_delta.extend(table_new_change_log);
832        for (table_id, vector_index_adds) in vector_index_adds {
833            info.vector_index_delta
834                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
835                .expect("non-duplicate");
836        }
837        if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
838            for fragment in job_info.stream_job_fragments.fragments.values() {
839                visit_stream_node_cont(&fragment.nodes, |node| {
840                    match node.node_body.as_ref().unwrap() {
841                        NodeBody::VectorIndexWrite(vector_index_write) => {
842                            let index_table = vector_index_write.table.as_ref().unwrap();
843                            assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
844                            info.vector_index_delta
845                                .try_insert(
846                                    index_table.id.into(),
847                                    VectorIndexDelta::Init(PbVectorIndexInit {
848                                        info: Some(index_table.vector_index_info.unwrap()),
849                                    }),
850                                )
851                                .expect("non-duplicate");
852                            false
853                        }
854                        _ => true,
855                    }
856                })
857            }
858        }
859        info.truncate_tables.extend(truncate_tables);
860    }
861}
862
863impl Command {
864    /// Generate a mutation for the given command.
865    ///
866    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
867    pub(super) fn to_mutation(
868        &self,
869        is_currently_paused: bool,
870        edges: &mut Option<FragmentEdgeBuildResult>,
871        control_stream_manager: &ControlStreamManager,
872    ) -> Option<Mutation> {
873        match self {
874            Command::Flush => None,
875
876            Command::Pause => {
877                // Only pause when the cluster is not already paused.
878                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
879                if !is_currently_paused {
880                    Some(Mutation::Pause(PauseMutation {}))
881                } else {
882                    None
883                }
884            }
885
886            Command::Resume => {
887                // Only resume when the cluster is paused with the same reason.
888                if is_currently_paused {
889                    Some(Mutation::Resume(ResumeMutation {}))
890                } else {
891                    None
892                }
893            }
894
895            Command::SourceChangeSplit(SplitState {
896                split_assignment, ..
897            }) => {
898                let mut diff = HashMap::new();
899
900                for actor_splits in split_assignment.values() {
901                    diff.extend(actor_splits.clone());
902                }
903
904                Some(Mutation::Splits(SourceChangeSplitMutation {
905                    actor_splits: build_actor_connector_splits(&diff),
906                }))
907            }
908
909            Command::Throttle(config) => {
910                let mut actor_to_apply = HashMap::new();
911                for per_fragment in config.values() {
912                    actor_to_apply.extend(
913                        per_fragment
914                            .iter()
915                            .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
916                    );
917                }
918
919                Some(Mutation::Throttle(ThrottleMutation {
920                    actor_throttle: actor_to_apply,
921                }))
922            }
923
924            Command::DropStreamingJobs {
925                actors,
926                dropped_sink_fragment_by_targets,
927                ..
928            } => Some(Mutation::Stop(StopMutation {
929                actors: actors.clone(),
930                dropped_sink_fragments: dropped_sink_fragment_by_targets
931                    .values()
932                    .flatten()
933                    .cloned()
934                    .collect(),
935            })),
936
937            Command::CreateStreamingJob {
938                info:
939                    CreateStreamingJobCommandInfo {
940                        stream_job_fragments: table_fragments,
941                        init_split_assignment: split_assignment,
942                        upstream_fragment_downstreams,
943                        fragment_backfill_ordering,
944                        cdc_table_snapshot_split_assignment,
945                        ..
946                    },
947                job_type,
948                ..
949            } => {
950                let edges = edges.as_mut().expect("should exist");
951                let added_actors = table_fragments.actor_ids();
952                let actor_splits = split_assignment
953                    .values()
954                    .flat_map(build_actor_connector_splits)
955                    .collect();
956                let subscriptions_to_add =
957                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
958                        job_type
959                    {
960                        snapshot_backfill_info
961                            .upstream_mv_table_id_to_backfill_epoch
962                            .keys()
963                            .map(|table_id| SubscriptionUpstreamInfo {
964                                subscriber_id: table_fragments.stream_job_id().table_id,
965                                upstream_mv_table_id: table_id.table_id,
966                            })
967                            .collect()
968                    } else {
969                        Default::default()
970                    };
971                let backfill_nodes_to_pause: Vec<_> =
972                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
973                        .into_iter()
974                        .collect();
975
976                let new_upstream_sinks =
977                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
978                        sink_fragment_id,
979                        sink_output_fields,
980                        project_exprs,
981                        new_sink_downstream,
982                        ..
983                    }) = job_type
984                    {
985                        let new_sink_actors = table_fragments
986                            .actors_to_create()
987                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
988                            .exactly_one()
989                            .map(|(_, _, actors)| {
990                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
991                                    actor_id: actor.actor_id,
992                                    host: Some(control_stream_manager.host_addr(worker_id)),
993                                })
994                            })
995                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
996                        let new_upstream_sink = PbNewUpstreamSink {
997                            info: Some(PbUpstreamSinkInfo {
998                                upstream_fragment_id: *sink_fragment_id,
999                                sink_output_schema: sink_output_fields.clone(),
1000                                project_exprs: project_exprs.clone(),
1001                            }),
1002                            upstream_actors: new_sink_actors.collect(),
1003                        };
1004                        HashMap::from([(
1005                            new_sink_downstream.downstream_fragment_id,
1006                            new_upstream_sink,
1007                        )])
1008                    } else {
1009                        HashMap::new()
1010                    };
1011
1012                let add_mutation = AddMutation {
1013                    actor_dispatchers: edges
1014                        .dispatchers
1015                        .extract_if(|fragment_id, _| {
1016                            upstream_fragment_downstreams.contains_key(fragment_id)
1017                        })
1018                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1019                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1020                        .collect(),
1021                    added_actors,
1022                    actor_splits,
1023                    // If the cluster is already paused, the new actors should be paused too.
1024                    pause: is_currently_paused,
1025                    subscriptions_to_add,
1026                    backfill_nodes_to_pause,
1027                    actor_cdc_table_snapshot_splits:
1028                        build_pb_actor_cdc_table_snapshot_splits_with_generation(
1029                            cdc_table_snapshot_split_assignment.clone(),
1030                        )
1031                        .into(),
1032                    new_upstream_sinks,
1033                };
1034
1035                Some(Mutation::Add(add_mutation))
1036            }
1037            Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
1038                Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1039                    info: jobs_to_merge
1040                        .iter()
1041                        .flat_map(|(table_id, (backfill_upstream_tables, _))| {
1042                            backfill_upstream_tables
1043                                .iter()
1044                                .map(move |upstream_table_id| SubscriptionUpstreamInfo {
1045                                    subscriber_id: table_id.table_id,
1046                                    upstream_mv_table_id: upstream_table_id.table_id,
1047                                })
1048                        })
1049                        .collect(),
1050                }))
1051            }
1052
1053            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1054                old_fragments,
1055                replace_upstream,
1056                upstream_fragment_downstreams,
1057                init_split_assignment,
1058                auto_refresh_schema_sinks,
1059                cdc_table_snapshot_split_assignment,
1060                ..
1061            }) => {
1062                let edges = edges.as_mut().expect("should exist");
1063                let merge_updates = edges
1064                    .merge_updates
1065                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1066                    .collect();
1067                let dispatchers = edges
1068                    .dispatchers
1069                    .extract_if(|fragment_id, _| {
1070                        upstream_fragment_downstreams.contains_key(fragment_id)
1071                    })
1072                    .collect();
1073                let cdc_table_snapshot_split_assignment =
1074                    if cdc_table_snapshot_split_assignment.is_empty() {
1075                        CdcTableSnapshotSplitAssignmentWithGeneration::empty()
1076                    } else {
1077                        CdcTableSnapshotSplitAssignmentWithGeneration::new(
1078                            cdc_table_snapshot_split_assignment.clone(),
1079                            control_stream_manager
1080                                .env
1081                                .cdc_table_backfill_tracker
1082                                .next_generation(iter::once(old_fragments.stream_job_id.table_id)),
1083                        )
1084                    };
1085                Self::generate_update_mutation_for_replace_table(
1086                    old_fragments.actor_ids().into_iter().chain(
1087                        auto_refresh_schema_sinks
1088                            .as_ref()
1089                            .into_iter()
1090                            .flat_map(|sinks| {
1091                                sinks.iter().flat_map(|sink| {
1092                                    sink.original_fragment
1093                                        .actors
1094                                        .iter()
1095                                        .map(|actor| actor.actor_id)
1096                                })
1097                            }),
1098                    ),
1099                    merge_updates,
1100                    dispatchers,
1101                    init_split_assignment,
1102                    cdc_table_snapshot_split_assignment,
1103                    auto_refresh_schema_sinks.as_ref(),
1104                )
1105            }
1106
1107            Command::RescheduleFragment {
1108                reschedules,
1109                fragment_actors,
1110                ..
1111            } => {
1112                let mut dispatcher_update = HashMap::new();
1113                for reschedule in reschedules.values() {
1114                    for &(upstream_fragment_id, dispatcher_id) in
1115                        &reschedule.upstream_fragment_dispatcher_ids
1116                    {
1117                        // Find the actors of the upstream fragment.
1118                        let upstream_actor_ids = fragment_actors
1119                            .get(&upstream_fragment_id)
1120                            .expect("should contain");
1121
1122                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1123
1124                        // Record updates for all actors.
1125                        for &actor_id in upstream_actor_ids {
1126                            let added_downstream_actor_id = if upstream_reschedule
1127                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1128                                .unwrap_or(true)
1129                            {
1130                                reschedule
1131                                    .added_actors
1132                                    .values()
1133                                    .flatten()
1134                                    .cloned()
1135                                    .collect()
1136                            } else {
1137                                Default::default()
1138                            };
1139                            // Index with the dispatcher id to check duplicates.
1140                            dispatcher_update
1141                                .try_insert(
1142                                    (actor_id, dispatcher_id),
1143                                    DispatcherUpdate {
1144                                        actor_id,
1145                                        dispatcher_id,
1146                                        hash_mapping: reschedule
1147                                            .upstream_dispatcher_mapping
1148                                            .as_ref()
1149                                            .map(|m| m.to_protobuf()),
1150                                        added_downstream_actor_id,
1151                                        removed_downstream_actor_id: reschedule
1152                                            .removed_actors
1153                                            .iter()
1154                                            .cloned()
1155                                            .collect(),
1156                                    },
1157                                )
1158                                .unwrap();
1159                        }
1160                    }
1161                }
1162                let dispatcher_update = dispatcher_update.into_values().collect();
1163
1164                let mut merge_update = HashMap::new();
1165                for (&fragment_id, reschedule) in reschedules {
1166                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1167                        // Find the actors of the downstream fragment.
1168                        let downstream_actor_ids = fragment_actors
1169                            .get(&downstream_fragment_id)
1170                            .expect("should contain");
1171
1172                        // Downstream removed actors should be skipped
1173                        // Newly created actors of the current fragment will not dispatch Update
1174                        // barriers to them
1175                        let downstream_removed_actors: HashSet<_> = reschedules
1176                            .get(&downstream_fragment_id)
1177                            .map(|downstream_reschedule| {
1178                                downstream_reschedule
1179                                    .removed_actors
1180                                    .iter()
1181                                    .copied()
1182                                    .collect()
1183                            })
1184                            .unwrap_or_default();
1185
1186                        // Record updates for all actors.
1187                        for &actor_id in downstream_actor_ids {
1188                            if downstream_removed_actors.contains(&actor_id) {
1189                                continue;
1190                            }
1191
1192                            // Index with the fragment id to check duplicates.
1193                            merge_update
1194                                .try_insert(
1195                                    (actor_id, fragment_id),
1196                                    MergeUpdate {
1197                                        actor_id,
1198                                        upstream_fragment_id: fragment_id,
1199                                        new_upstream_fragment_id: None,
1200                                        added_upstream_actors: reschedule
1201                                            .added_actors
1202                                            .iter()
1203                                            .flat_map(|(worker_id, actors)| {
1204                                                let host =
1205                                                    control_stream_manager.host_addr(*worker_id);
1206                                                actors.iter().map(move |actor_id| PbActorInfo {
1207                                                    actor_id: *actor_id,
1208                                                    host: Some(host.clone()),
1209                                                })
1210                                            })
1211                                            .collect(),
1212                                        removed_upstream_actor_id: reschedule
1213                                            .removed_actors
1214                                            .iter()
1215                                            .cloned()
1216                                            .collect(),
1217                                    },
1218                                )
1219                                .unwrap();
1220                        }
1221                    }
1222                }
1223                let merge_update = merge_update.into_values().collect();
1224
1225                let mut actor_vnode_bitmap_update = HashMap::new();
1226                for reschedule in reschedules.values() {
1227                    // Record updates for all actors in this fragment.
1228                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1229                        let bitmap = bitmap.to_protobuf();
1230                        actor_vnode_bitmap_update
1231                            .try_insert(actor_id, bitmap)
1232                            .unwrap();
1233                    }
1234                }
1235                let dropped_actors = reschedules
1236                    .values()
1237                    .flat_map(|r| r.removed_actors.iter().copied())
1238                    .collect();
1239                let mut actor_splits = HashMap::new();
1240                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1241                let mut cdc_table_ids: HashSet<_> = HashSet::default();
1242                for reschedule in reschedules.values() {
1243                    for (actor_id, splits) in &reschedule.actor_splits {
1244                        actor_splits.insert(
1245                            *actor_id as ActorId,
1246                            ConnectorSplits {
1247                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1248                            },
1249                        );
1250                    }
1251                    actor_cdc_table_snapshot_splits.extend(
1252                        build_pb_actor_cdc_table_snapshot_splits(
1253                            reschedule.cdc_table_snapshot_split_assignment.clone(),
1254                        ),
1255                    );
1256                    if let Some(cdc_table_id) = reschedule.cdc_table_id {
1257                        cdc_table_ids.insert(cdc_table_id);
1258                    }
1259                }
1260
1261                // we don't create dispatchers in reschedule scenario
1262                let actor_new_dispatchers = HashMap::new();
1263                let actor_cdc_table_snapshot_splits = if actor_cdc_table_snapshot_splits.is_empty()
1264                {
1265                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
1266                        CdcTableSnapshotSplitAssignmentWithGeneration::empty(),
1267                    )
1268                    .into()
1269                } else {
1270                    PbCdcTableSnapshotSplitsWithGeneration {
1271                        splits: actor_cdc_table_snapshot_splits,
1272                        generation: control_stream_manager
1273                            .env
1274                            .cdc_table_backfill_tracker
1275                            .next_generation(cdc_table_ids.into_iter()),
1276                    }
1277                    .into()
1278                };
1279                let mutation = Mutation::Update(UpdateMutation {
1280                    dispatcher_update,
1281                    merge_update,
1282                    actor_vnode_bitmap_update,
1283                    dropped_actors,
1284                    actor_splits,
1285                    actor_new_dispatchers,
1286                    actor_cdc_table_snapshot_splits,
1287                    sink_add_columns: Default::default(),
1288                });
1289                tracing::debug!("update mutation: {mutation:?}");
1290                Some(mutation)
1291            }
1292
1293            Command::CreateSubscription {
1294                upstream_mv_table_id,
1295                subscription_id,
1296                ..
1297            } => Some(Mutation::Add(AddMutation {
1298                actor_dispatchers: Default::default(),
1299                added_actors: vec![],
1300                actor_splits: Default::default(),
1301                pause: false,
1302                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1303                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1304                    subscriber_id: *subscription_id,
1305                }],
1306                backfill_nodes_to_pause: vec![],
1307                actor_cdc_table_snapshot_splits: Default::default(),
1308                new_upstream_sinks: Default::default(),
1309            })),
1310            Command::DropSubscription {
1311                upstream_mv_table_id,
1312                subscription_id,
1313            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1314                info: vec![SubscriptionUpstreamInfo {
1315                    subscriber_id: *subscription_id,
1316                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1317                }],
1318            })),
1319            Command::ConnectorPropsChange(config) => {
1320                let mut connector_props_infos = HashMap::default();
1321                for (k, v) in config {
1322                    connector_props_infos.insert(
1323                        *k,
1324                        ConnectorPropsInfo {
1325                            connector_props_info: v.clone(),
1326                        },
1327                    );
1328                }
1329                Some(Mutation::ConnectorPropsChange(
1330                    ConnectorPropsChangeMutation {
1331                        connector_props_infos,
1332                    },
1333                ))
1334            }
1335            Command::StartFragmentBackfill { fragment_ids } => Some(
1336                Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1337                    fragment_ids: fragment_ids.clone(),
1338                }),
1339            ),
1340            Command::Refresh {
1341                table_id,
1342                associated_source_id,
1343            } => Some(Mutation::RefreshStart(
1344                risingwave_pb::stream_plan::RefreshStartMutation {
1345                    table_id: table_id.table_id,
1346                    associated_source_id: associated_source_id.table_id,
1347                },
1348            )),
1349            Command::ListFinish {
1350                table_id: _,
1351                associated_source_id,
1352            } => Some(Mutation::ListFinish(ListFinishMutation {
1353                associated_source_id: associated_source_id.table_id,
1354            })),
1355            Command::LoadFinish {
1356                table_id: _,
1357                associated_source_id,
1358            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1359                associated_source_id: associated_source_id.table_id,
1360            })),
1361        }
1362    }
1363
1364    pub(super) fn actors_to_create(
1365        &self,
1366        graph_info: &InflightDatabaseInfo,
1367        edges: &mut Option<FragmentEdgeBuildResult>,
1368        control_stream_manager: &ControlStreamManager,
1369    ) -> Option<StreamJobActorsToCreate> {
1370        match self {
1371            Command::CreateStreamingJob { info, job_type, .. } => {
1372                if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1373                    // for snapshot backfill, the actors to create is measured separately
1374                    return None;
1375                }
1376                let actors_to_create = info.stream_job_fragments.actors_to_create();
1377                let edges = edges.as_mut().expect("should exist");
1378                Some(edges.collect_actors_to_create(actors_to_create))
1379            }
1380            Command::RescheduleFragment {
1381                reschedules,
1382                fragment_actors,
1383                ..
1384            } => {
1385                let mut actor_upstreams = Self::collect_actor_upstreams(
1386                    reschedules.iter().map(|(fragment_id, reschedule)| {
1387                        (
1388                            *fragment_id,
1389                            reschedule.newly_created_actors.values().map(
1390                                |((actor, dispatchers), _)| {
1391                                    (actor.actor_id, dispatchers.as_slice())
1392                                },
1393                            ),
1394                        )
1395                    }),
1396                    Some((reschedules, fragment_actors)),
1397                    graph_info,
1398                    control_stream_manager,
1399                );
1400                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1401                for (fragment_id, (actor, dispatchers), worker_id) in
1402                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1403                        reschedule
1404                            .newly_created_actors
1405                            .values()
1406                            .map(|(actors, status)| (*fragment_id, actors, status))
1407                    })
1408                {
1409                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1410                    map.entry(*worker_id)
1411                        .or_default()
1412                        .entry(fragment_id)
1413                        .or_insert_with(|| {
1414                            let node = graph_info.fragment(fragment_id).nodes.clone();
1415                            (node, vec![])
1416                        })
1417                        .1
1418                        .push((actor.clone(), upstreams, dispatchers.clone()));
1419                }
1420                Some(map)
1421            }
1422            Command::ReplaceStreamJob(replace_table) => {
1423                let edges = edges.as_mut().expect("should exist");
1424                let mut actors =
1425                    edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create());
1426                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1427                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1428                        (
1429                            sink.new_fragment.fragment_id,
1430                            &sink.new_fragment.nodes,
1431                            sink.new_fragment.actors.iter().map(|actor| {
1432                                (
1433                                    actor,
1434                                    sink.actor_status[&actor.actor_id]
1435                                        .location
1436                                        .as_ref()
1437                                        .unwrap()
1438                                        .worker_node_id as _,
1439                                )
1440                            }),
1441                        )
1442                    }));
1443                    for (worker_id, fragment_actors) in sink_actors {
1444                        actors.entry(worker_id).or_default().extend(fragment_actors);
1445                    }
1446                }
1447                Some(actors)
1448            }
1449            _ => None,
1450        }
1451    }
1452
1453    fn generate_update_mutation_for_replace_table(
1454        dropped_actors: impl IntoIterator<Item = ActorId>,
1455        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1456        dispatchers: FragmentActorDispatchers,
1457        init_split_assignment: &SplitAssignment,
1458        cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
1459        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1460    ) -> Option<Mutation> {
1461        let dropped_actors = dropped_actors.into_iter().collect();
1462
1463        let actor_new_dispatchers = dispatchers
1464            .into_values()
1465            .flatten()
1466            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1467            .collect();
1468
1469        let actor_splits = init_split_assignment
1470            .values()
1471            .flat_map(build_actor_connector_splits)
1472            .collect();
1473        Some(Mutation::Update(UpdateMutation {
1474            actor_new_dispatchers,
1475            merge_update: merge_updates.into_values().flatten().collect(),
1476            dropped_actors,
1477            actor_splits,
1478            actor_cdc_table_snapshot_splits:
1479                build_pb_actor_cdc_table_snapshot_splits_with_generation(
1480                    cdc_table_snapshot_split_assignment,
1481                )
1482                .into(),
1483            sink_add_columns: auto_refresh_schema_sinks
1484                .as_ref()
1485                .into_iter()
1486                .flat_map(|sinks| {
1487                    sinks.iter().map(|sink| {
1488                        (
1489                            sink.original_sink.id,
1490                            PbSinkAddColumns {
1491                                fields: sink
1492                                    .newly_add_fields
1493                                    .iter()
1494                                    .map(|field| field.to_prost())
1495                                    .collect(),
1496                            },
1497                        )
1498                    })
1499                })
1500                .collect(),
1501            ..Default::default()
1502        }))
1503    }
1504
1505    /// For `CancelStreamingJob`, returns the table id of the target table.
1506    pub fn jobs_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1507        match self {
1508            Command::DropStreamingJobs {
1509                streaming_job_ids, ..
1510            } => Some(streaming_job_ids.iter().cloned()),
1511            _ => None,
1512        }
1513        .into_iter()
1514        .flatten()
1515    }
1516}
1517
1518impl Command {
1519    #[expect(clippy::type_complexity)]
1520    pub(super) fn collect_actor_upstreams(
1521        actor_dispatchers: impl Iterator<
1522            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1523        >,
1524        reschedule_dispatcher_update: Option<(
1525            &HashMap<FragmentId, Reschedule>,
1526            &HashMap<FragmentId, HashSet<ActorId>>,
1527        )>,
1528        graph_info: &InflightDatabaseInfo,
1529        control_stream_manager: &ControlStreamManager,
1530    ) -> HashMap<ActorId, ActorUpstreams> {
1531        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1532        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1533            let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1534            for (upstream_actor_id, dispatchers) in upstream_actors {
1535                let upstream_actor_location =
1536                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1537                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1538                for downstream_actor_id in dispatchers
1539                    .iter()
1540                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1541                {
1542                    actor_upstreams
1543                        .entry(*downstream_actor_id)
1544                        .or_default()
1545                        .entry(upstream_fragment_id)
1546                        .or_default()
1547                        .insert(
1548                            upstream_actor_id,
1549                            PbActorInfo {
1550                                actor_id: upstream_actor_id,
1551                                host: Some(upstream_actor_host.clone()),
1552                            },
1553                        );
1554                }
1555            }
1556        }
1557        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1558            for reschedule in reschedules.values() {
1559                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1560                    let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1561                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1562                    for upstream_actor_id in fragment_actors
1563                        .get(upstream_fragment_id)
1564                        .expect("should exist")
1565                    {
1566                        let upstream_actor_location =
1567                            upstream_fragment.actors[upstream_actor_id].worker_id;
1568                        let upstream_actor_host =
1569                            control_stream_manager.host_addr(upstream_actor_location);
1570                        if let Some(upstream_reschedule) = upstream_reschedule
1571                            && upstream_reschedule
1572                                .removed_actors
1573                                .contains(upstream_actor_id)
1574                        {
1575                            continue;
1576                        }
1577                        for (_, downstream_actor_id) in
1578                            reschedule
1579                                .added_actors
1580                                .iter()
1581                                .flat_map(|(worker_id, actors)| {
1582                                    actors.iter().map(|actor| (*worker_id, *actor))
1583                                })
1584                        {
1585                            actor_upstreams
1586                                .entry(downstream_actor_id)
1587                                .or_default()
1588                                .entry(*upstream_fragment_id)
1589                                .or_default()
1590                                .insert(
1591                                    *upstream_actor_id,
1592                                    PbActorInfo {
1593                                        actor_id: *upstream_actor_id,
1594                                        host: Some(upstream_actor_host.clone()),
1595                                    },
1596                                );
1597                        }
1598                    }
1599                }
1600            }
1601        }
1602        actor_upstreams
1603    }
1604}