risingwave_meta/barrier/
command.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18use std::iter;
19
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::TableId;
23use risingwave_common::hash::ActorMapping;
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::SplitImpl;
29use risingwave_connector::source::cdc::{
30    CdcTableSnapshotSplitAssignment, CdcTableSnapshotSplitAssignmentWithGeneration,
31    build_pb_actor_cdc_table_snapshot_splits,
32    build_pb_actor_cdc_table_snapshot_splits_with_generation,
33};
34use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
35use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::catalog::CreateType;
38use risingwave_pb::catalog::table::PbTableType;
39use risingwave_pb::common::PbActorInfo;
40use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
41use risingwave_pb::source::{
42    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
43};
44use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
45use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
46use risingwave_pb::stream_plan::barrier_mutation::Mutation;
47use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
48use risingwave_pb::stream_plan::stream_node::NodeBody;
49use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
50use risingwave_pb::stream_plan::update_mutation::*;
51use risingwave_pb::stream_plan::{
52    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
53    LoadFinishMutation, PauseMutation, PbSinkAddColumns, PbUpstreamSinkInfo, ResumeMutation,
54    SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
55    SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
56};
57use risingwave_pb::stream_service::BarrierCompleteResponse;
58use tracing::warn;
59
60use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
61use crate::barrier::InflightSubscriptionInfo;
62use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
63use crate::barrier::edge_builder::FragmentEdgeBuildResult;
64use crate::barrier::info::BarrierInfo;
65use crate::barrier::rpc::ControlStreamManager;
66use crate::barrier::utils::collect_resp_info;
67use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
68use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
69use crate::manager::{StreamingJob, StreamingJobType};
70use crate::model::{
71    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
72    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
73    StreamJobFragments, StreamJobFragmentsToCreate,
74};
75use crate::stream::{
76    AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder,
77    JobReschedulePostUpdates, SplitAssignment, SplitState, ThrottleConfig, UpstreamSinkInfo,
78    build_actor_connector_splits,
79};
80
81/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
82/// in some fragment, like scaling or migrating.
83#[derive(Debug, Clone)]
84pub struct Reschedule {
85    /// Added actors in this fragment.
86    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
87
88    /// Removed actors in this fragment.
89    pub removed_actors: HashSet<ActorId>,
90
91    /// Vnode bitmap updates for some actors in this fragment.
92    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
93
94    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
95    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
96    /// New hash mapping of the upstream dispatcher to be updated.
97    ///
98    /// This field exists only when there's upstream fragment and the current fragment is
99    /// hash-sharded.
100    pub upstream_dispatcher_mapping: Option<ActorMapping>,
101
102    /// The downstream fragments of this fragment.
103    pub downstream_fragment_ids: Vec<FragmentId>,
104
105    /// Reassigned splits for source actors.
106    /// It becomes the `actor_splits` in [`UpdateMutation`].
107    /// `Source` and `SourceBackfill` are handled together here.
108    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
109
110    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
111
112    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
113
114    pub cdc_table_id: Option<u32>,
115}
116
117/// Replacing an old job with a new one. All actors in the job will be rebuilt.
118///
119/// Current use cases:
120/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
121/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
122///   will replace a table job's plan.
123#[derive(Debug, Clone)]
124pub struct ReplaceStreamJobPlan {
125    pub old_fragments: StreamJobFragments,
126    pub new_fragments: StreamJobFragmentsToCreate,
127    /// Downstream jobs of the replaced job need to update their `Merge` node to
128    /// connect to the new fragment.
129    pub replace_upstream: FragmentReplaceUpstream,
130    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
131    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
132    /// We need to reassign splits for it.
133    ///
134    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
135    /// `backfill_splits`.
136    pub init_split_assignment: SplitAssignment,
137    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
138    pub streaming_job: StreamingJob,
139    /// The temporary dummy job fragments id of new table fragment
140    pub tmp_id: u32,
141    /// The state table ids to be dropped.
142    pub to_drop_state_table_ids: Vec<TableId>,
143    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
144    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
145}
146
147impl ReplaceStreamJobPlan {
148    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
149        let mut fragment_changes = HashMap::new();
150        for (fragment_id, new_fragment) in self
151            .new_fragments
152            .new_fragment_info(&self.init_split_assignment)
153        {
154            let fragment_change = CommandFragmentChanges::NewFragment {
155                job_id: self.streaming_job.id().into(),
156                info: new_fragment,
157                is_existing: false,
158            };
159            fragment_changes
160                .try_insert(fragment_id, fragment_change)
161                .expect("non-duplicate");
162        }
163        for fragment in self.old_fragments.fragments.values() {
164            fragment_changes
165                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
166                .expect("non-duplicate");
167        }
168        for (fragment_id, replace_map) in &self.replace_upstream {
169            fragment_changes
170                .try_insert(
171                    *fragment_id,
172                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
173                )
174                .expect("non-duplicate");
175        }
176        if let Some(sinks) = &self.auto_refresh_schema_sinks {
177            for sink in sinks {
178                let fragment_change = CommandFragmentChanges::NewFragment {
179                    job_id: TableId::new(sink.original_sink.id as _),
180                    info: sink.new_fragment_info(),
181                    is_existing: false,
182                };
183                fragment_changes
184                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
185                    .expect("non-duplicate");
186                fragment_changes
187                    .try_insert(
188                        sink.original_fragment.fragment_id,
189                        CommandFragmentChanges::RemoveFragment,
190                    )
191                    .expect("non-duplicate");
192            }
193        }
194        fragment_changes
195    }
196
197    /// `old_fragment_id` -> `new_fragment_id`
198    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
199        let mut fragment_replacements = HashMap::new();
200        for (upstream_fragment_id, new_upstream_fragment_id) in
201            self.replace_upstream.values().flatten()
202        {
203            {
204                let r =
205                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
206                if let Some(r) = r {
207                    assert_eq!(
208                        *new_upstream_fragment_id, r,
209                        "one fragment is replaced by multiple fragments"
210                    );
211                }
212            }
213        }
214        fragment_replacements
215    }
216}
217
218#[derive(educe::Educe, Clone)]
219#[educe(Debug)]
220pub struct CreateStreamingJobCommandInfo {
221    #[educe(Debug(ignore))]
222    pub stream_job_fragments: StreamJobFragmentsToCreate,
223    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
224    pub init_split_assignment: SplitAssignment,
225    pub definition: String,
226    pub job_type: StreamingJobType,
227    pub create_type: CreateType,
228    pub streaming_job: StreamingJob,
229    pub fragment_backfill_ordering: FragmentBackfillOrder,
230    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
231}
232
233impl StreamJobFragments {
234    pub(super) fn new_fragment_info<'a>(
235        &'a self,
236        assignment: &'a SplitAssignment,
237    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
238        self.fragments.values().map(|fragment| {
239            let mut fragment_splits = assignment
240                .get(&fragment.fragment_id)
241                .cloned()
242                .unwrap_or_default();
243
244            (
245                fragment.fragment_id,
246                InflightFragmentInfo {
247                    fragment_id: fragment.fragment_id,
248                    distribution_type: fragment.distribution_type.into(),
249                    nodes: fragment.nodes.clone(),
250                    actors: fragment
251                        .actors
252                        .iter()
253                        .map(|actor| {
254                            (
255                                actor.actor_id,
256                                InflightActorInfo {
257                                    worker_id: self
258                                        .actor_status
259                                        .get(&actor.actor_id)
260                                        .expect("should exist")
261                                        .worker_id()
262                                        as WorkerId,
263                                    vnode_bitmap: actor.vnode_bitmap.clone(),
264                                    splits: fragment_splits
265                                        .remove(&actor.actor_id)
266                                        .unwrap_or_default(),
267                                },
268                            )
269                        })
270                        .collect(),
271                    state_table_ids: fragment
272                        .state_table_ids
273                        .iter()
274                        .map(|table_id| TableId::new(*table_id))
275                        .collect(),
276                },
277            )
278        })
279    }
280}
281
282#[derive(Debug, Clone)]
283pub struct SnapshotBackfillInfo {
284    /// `table_id` -> `Some(snapshot_backfill_epoch)`
285    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
286    /// by global barrier worker when handling the command.
287    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
288}
289
290#[derive(Debug, Clone)]
291pub enum CreateStreamingJobType {
292    Normal,
293    SinkIntoTable(UpstreamSinkInfo),
294    SnapshotBackfill(SnapshotBackfillInfo),
295}
296
297/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
298/// it will [build different barriers to send](Self::to_mutation),
299/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
300// FIXME: this enum is significantly large on stack, box it
301#[derive(Debug)]
302pub enum Command {
303    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
304    /// all messages before the checkpoint barrier should have been committed.
305    Flush,
306
307    /// `Pause` command generates a `Pause` barrier **only if**
308    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
309    Pause,
310
311    /// `Resume` command generates a `Resume` barrier **only
312    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
313    /// will be generated.
314    Resume,
315
316    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
317    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
318    /// dropped by reference counts before.
319    ///
320    /// Barriers from the actors to be dropped will STILL be collected.
321    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
322    /// drop actors, and then delete the job fragments info from meta store.
323    DropStreamingJobs {
324        streaming_job_ids: HashSet<TableId>,
325        actors: Vec<ActorId>,
326        unregistered_state_table_ids: HashSet<TableId>,
327        unregistered_fragment_ids: HashSet<FragmentId>,
328        // target_fragment -> [sink_fragments]
329        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
330    },
331
332    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
333    ///
334    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
335    /// be collected since the barrier should be passthrough.
336    ///
337    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
338    /// it adds the job fragments info to meta store. However, the creating progress will **last
339    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
340    /// will be set to `Created`.
341    CreateStreamingJob {
342        info: CreateStreamingJobCommandInfo,
343        job_type: CreateStreamingJobType,
344        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
345    },
346    MergeSnapshotBackfillStreamingJobs(
347        HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
348    ),
349
350    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
351    /// Mainly used for scaling and migration.
352    ///
353    /// Barriers from which actors should be collected, and the post behavior of this command are
354    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
355    RescheduleFragment {
356        reschedules: HashMap<FragmentId, Reschedule>,
357        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
358        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
359        // Used for updating additional metadata after the barrier ends
360        post_updates: JobReschedulePostUpdates,
361    },
362
363    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
364    /// essentially switching the downstream of the old job fragments to the new ones, and
365    /// dropping the old job fragments. Used for schema change.
366    ///
367    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
368    /// of the Merge executors are changed additionally.
369    ReplaceStreamJob(ReplaceStreamJobPlan),
370
371    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
372    /// changed splits.
373    SourceChangeSplit(SplitState),
374
375    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
376    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
377    Throttle(ThrottleConfig),
378
379    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
380    /// materialize executor to start storing old value for subscription.
381    CreateSubscription {
382        subscription_id: u32,
383        upstream_mv_table_id: TableId,
384        retention_second: u64,
385    },
386
387    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
388    /// materialize executor to stop storing old value when there is no
389    /// subscription depending on it.
390    DropSubscription {
391        subscription_id: u32,
392        upstream_mv_table_id: TableId,
393    },
394
395    ConnectorPropsChange(ConnectorPropsChange),
396
397    /// `StartFragmentBackfill` command will trigger backfilling for specified scans by `fragment_id`.
398    StartFragmentBackfill {
399        fragment_ids: Vec<FragmentId>,
400    },
401
402    /// `Refresh` command generates a barrier to refresh a table by truncating state
403    /// and reloading data from source.
404    Refresh {
405        table_id: TableId,
406        associated_source_id: TableId,
407    },
408    LoadFinish {
409        table_id: TableId,
410        associated_source_id: TableId,
411    },
412}
413
414// For debugging and observability purposes. Can add more details later if needed.
415impl std::fmt::Display for Command {
416    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
417        match self {
418            Command::Flush => write!(f, "Flush"),
419            Command::Pause => write!(f, "Pause"),
420            Command::Resume => write!(f, "Resume"),
421            Command::DropStreamingJobs {
422                streaming_job_ids, ..
423            } => {
424                write!(
425                    f,
426                    "DropStreamingJobs: {}",
427                    streaming_job_ids.iter().sorted().join(", ")
428                )
429            }
430            Command::CreateStreamingJob { info, .. } => {
431                write!(f, "CreateStreamingJob: {}", info.streaming_job)
432            }
433            Command::MergeSnapshotBackfillStreamingJobs(_) => {
434                write!(f, "MergeSnapshotBackfillStreamingJobs")
435            }
436            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
437            Command::ReplaceStreamJob(plan) => {
438                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
439            }
440            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
441            Command::Throttle(_) => write!(f, "Throttle"),
442            Command::CreateSubscription {
443                subscription_id, ..
444            } => write!(f, "CreateSubscription: {subscription_id}"),
445            Command::DropSubscription {
446                subscription_id, ..
447            } => write!(f, "DropSubscription: {subscription_id}"),
448            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
449            Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
450            Command::Refresh {
451                table_id,
452                associated_source_id,
453            } => write!(
454                f,
455                "Refresh: {} (source: {})",
456                table_id, associated_source_id
457            ),
458            Command::LoadFinish {
459                table_id,
460                associated_source_id,
461            } => write!(
462                f,
463                "LoadFinish: {} (source: {})",
464                table_id, associated_source_id
465            ),
466        }
467    }
468}
469
470impl Command {
471    pub fn pause() -> Self {
472        Self::Pause
473    }
474
475    pub fn resume() -> Self {
476        Self::Resume
477    }
478
479    pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
480        match self {
481            Command::Flush => None,
482            Command::Pause => None,
483            Command::Resume => None,
484            Command::DropStreamingJobs {
485                unregistered_fragment_ids,
486                dropped_sink_fragment_by_targets,
487                ..
488            } => {
489                let changes = unregistered_fragment_ids
490                    .iter()
491                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
492                    .chain(dropped_sink_fragment_by_targets.iter().map(
493                        |(target_fragment, sink_fragments)| {
494                            (
495                                *target_fragment,
496                                CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
497                            )
498                        },
499                    ))
500                    .collect();
501
502                Some(changes)
503            }
504            Command::CreateStreamingJob { info, job_type, .. } => {
505                assert!(
506                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
507                    "should handle fragment changes separately for snapshot backfill"
508                );
509                let mut changes: HashMap<_, _> = info
510                    .stream_job_fragments
511                    .new_fragment_info(&info.init_split_assignment)
512                    .map(|(fragment_id, fragment_info)| {
513                        (
514                            fragment_id,
515                            CommandFragmentChanges::NewFragment {
516                                job_id: info.streaming_job.id().into(),
517                                info: fragment_info,
518                                is_existing: false,
519                            },
520                        )
521                    })
522                    .collect();
523
524                if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
525                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
526                    changes.insert(
527                        downstream_fragment_id,
528                        CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
529                            upstream_fragment_id: ctx.sink_fragment_id,
530                            sink_output_schema: ctx.sink_output_fields.clone(),
531                            project_exprs: ctx.project_exprs.clone(),
532                        }),
533                    );
534                }
535
536                Some(changes)
537            }
538            Command::RescheduleFragment { reschedules, .. } => Some(
539                reschedules
540                    .iter()
541                    .map(|(fragment_id, reschedule)| {
542                        (
543                            *fragment_id,
544                            CommandFragmentChanges::Reschedule {
545                                new_actors: reschedule
546                                    .added_actors
547                                    .iter()
548                                    .flat_map(|(node_id, actors)| {
549                                        actors.iter().map(|actor_id| {
550                                            (
551                                                *actor_id,
552                                                InflightActorInfo {
553                                                    worker_id: *node_id,
554                                                    vnode_bitmap: reschedule
555                                                        .newly_created_actors
556                                                        .get(actor_id)
557                                                        .expect("should exist")
558                                                        .0
559                                                        .0
560                                                        .vnode_bitmap
561                                                        .clone(),
562                                                    splits: reschedule
563                                                        .actor_splits
564                                                        .get(actor_id)
565                                                        .cloned()
566                                                        .unwrap_or_default(),
567                                                },
568                                            )
569                                        })
570                                    })
571                                    .collect(),
572                                actor_update_vnode_bitmap: reschedule
573                                    .vnode_bitmap_updates
574                                    .iter()
575                                    .filter(|(actor_id, _)| {
576                                        // only keep the existing actors
577                                        !reschedule.newly_created_actors.contains_key(actor_id)
578                                    })
579                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
580                                    .collect(),
581                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
582                                actor_splits: reschedule.actor_splits.clone(),
583                            },
584                        )
585                    })
586                    .collect(),
587            ),
588            Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
589            Command::MergeSnapshotBackfillStreamingJobs(_) => None,
590            Command::SourceChangeSplit(SplitState {
591                split_assignment, ..
592            }) => Some(
593                split_assignment
594                    .iter()
595                    .map(|(&fragment_id, splits)| {
596                        (
597                            fragment_id,
598                            CommandFragmentChanges::SplitAssignment {
599                                actor_splits: splits.clone(),
600                            },
601                        )
602                    })
603                    .collect(),
604            ),
605            Command::Throttle(_) => None,
606            Command::CreateSubscription { .. } => None,
607            Command::DropSubscription { .. } => None,
608            Command::ConnectorPropsChange(_) => None,
609            Command::StartFragmentBackfill { .. } => None,
610            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
611            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
612        }
613    }
614
615    pub fn need_checkpoint(&self) -> bool {
616        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
617        !matches!(self, Command::Resume)
618    }
619}
620
621#[derive(Debug, Clone)]
622pub enum BarrierKind {
623    Initial,
624    Barrier,
625    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
626    Checkpoint(Vec<u64>),
627}
628
629impl BarrierKind {
630    pub fn to_protobuf(&self) -> PbBarrierKind {
631        match self {
632            BarrierKind::Initial => PbBarrierKind::Initial,
633            BarrierKind::Barrier => PbBarrierKind::Barrier,
634            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
635        }
636    }
637
638    pub fn is_checkpoint(&self) -> bool {
639        matches!(self, BarrierKind::Checkpoint(_))
640    }
641
642    pub fn is_initial(&self) -> bool {
643        matches!(self, BarrierKind::Initial)
644    }
645
646    pub fn as_str_name(&self) -> &'static str {
647        match self {
648            BarrierKind::Initial => "Initial",
649            BarrierKind::Barrier => "Barrier",
650            BarrierKind::Checkpoint(_) => "Checkpoint",
651        }
652    }
653}
654
655/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
656/// [`Command`].
657pub(super) struct CommandContext {
658    subscription_info: InflightSubscriptionInfo,
659
660    pub(super) barrier_info: BarrierInfo,
661
662    pub(super) table_ids_to_commit: HashSet<TableId>,
663
664    pub(super) command: Option<Command>,
665
666    /// The tracing span of this command.
667    ///
668    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
669    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
670    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
671    _span: tracing::Span,
672}
673
674impl std::fmt::Debug for CommandContext {
675    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
676        f.debug_struct("CommandContext")
677            .field("barrier_info", &self.barrier_info)
678            .field("command", &self.command)
679            .finish()
680    }
681}
682
683impl std::fmt::Display for CommandContext {
684    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
685        write!(
686            f,
687            "prev_epoch={}, curr_epoch={}, kind={}",
688            self.barrier_info.prev_epoch.value().0,
689            self.barrier_info.curr_epoch.value().0,
690            self.barrier_info.kind.as_str_name()
691        )?;
692        if let Some(command) = &self.command {
693            write!(f, ", command={}", command)?;
694        }
695        Ok(())
696    }
697}
698
699impl CommandContext {
700    pub(super) fn new(
701        barrier_info: BarrierInfo,
702        subscription_info: InflightSubscriptionInfo,
703        table_ids_to_commit: HashSet<TableId>,
704        command: Option<Command>,
705        span: tracing::Span,
706    ) -> Self {
707        Self {
708            subscription_info,
709            barrier_info,
710            table_ids_to_commit,
711            command,
712            _span: span,
713        }
714    }
715
716    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
717        let Some(truncate_timestamptz) = Timestamptz::from_secs(
718            self.barrier_info
719                .prev_epoch
720                .value()
721                .as_timestamptz()
722                .timestamp()
723                - retention_second as i64,
724        ) else {
725            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
726            return self.barrier_info.prev_epoch.value();
727        };
728        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
729    }
730
731    pub(super) fn collect_commit_epoch_info(
732        &self,
733        info: &mut CommitEpochInfo,
734        resps: Vec<BarrierCompleteResponse>,
735        backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
736    ) {
737        let (
738            sst_to_context,
739            synced_ssts,
740            new_table_watermarks,
741            old_value_ssts,
742            vector_index_adds,
743            truncate_tables,
744        ) = collect_resp_info(resps);
745
746        let new_table_fragment_infos =
747            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
748                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
749            {
750                let table_fragments = &info.stream_job_fragments;
751                let mut table_ids: HashSet<_> = table_fragments
752                    .internal_table_ids()
753                    .into_iter()
754                    .map(TableId::new)
755                    .collect();
756                if let Some(mv_table_id) = table_fragments.mv_table_id() {
757                    table_ids.insert(TableId::new(mv_table_id));
758                }
759
760                vec![NewTableFragmentInfo { table_ids }]
761            } else {
762                vec![]
763            };
764
765        let mut mv_log_store_truncate_epoch = HashMap::new();
766        // TODO: may collect cross db snapshot backfill
767        let mut update_truncate_epoch =
768            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
769                .entry(table_id.table_id)
770            {
771                Entry::Occupied(mut entry) => {
772                    let prev_truncate_epoch = entry.get_mut();
773                    if truncate_epoch < *prev_truncate_epoch {
774                        *prev_truncate_epoch = truncate_epoch;
775                    }
776                }
777                Entry::Vacant(entry) => {
778                    entry.insert(truncate_epoch);
779                }
780            };
781        for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
782            if let Some(truncate_epoch) = subscriptions
783                .values()
784                .max()
785                .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
786            {
787                update_truncate_epoch(*mv_table_id, truncate_epoch);
788            }
789        }
790        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
791            for mv_table_id in upstream_mv_table_ids {
792                update_truncate_epoch(mv_table_id, backfill_epoch);
793            }
794        }
795
796        let table_new_change_log = build_table_change_log_delta(
797            old_value_ssts.into_iter(),
798            synced_ssts.iter().map(|sst| &sst.sst_info),
799            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
800            mv_log_store_truncate_epoch.into_iter(),
801        );
802
803        let epoch = self.barrier_info.prev_epoch();
804        for table_id in &self.table_ids_to_commit {
805            info.tables_to_commit
806                .try_insert(*table_id, epoch)
807                .expect("non duplicate");
808        }
809
810        info.sstables.extend(synced_ssts);
811        info.new_table_watermarks.extend(new_table_watermarks);
812        info.sst_to_context.extend(sst_to_context);
813        info.new_table_fragment_infos
814            .extend(new_table_fragment_infos);
815        info.change_log_delta.extend(table_new_change_log);
816        for (table_id, vector_index_adds) in vector_index_adds {
817            info.vector_index_delta
818                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
819                .expect("non-duplicate");
820        }
821        if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
822            for fragment in job_info.stream_job_fragments.fragments.values() {
823                visit_stream_node_cont(&fragment.nodes, |node| {
824                    match node.node_body.as_ref().unwrap() {
825                        NodeBody::VectorIndexWrite(vector_index_write) => {
826                            let index_table = vector_index_write.table.as_ref().unwrap();
827                            assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
828                            info.vector_index_delta
829                                .try_insert(
830                                    index_table.id.into(),
831                                    VectorIndexDelta::Init(PbVectorIndexInit {
832                                        info: Some(index_table.vector_index_info.unwrap()),
833                                    }),
834                                )
835                                .expect("non-duplicate");
836                            false
837                        }
838                        _ => true,
839                    }
840                })
841            }
842        }
843        info.truncate_tables.extend(truncate_tables);
844    }
845}
846
847impl Command {
848    /// Generate a mutation for the given command.
849    ///
850    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
851    pub(super) fn to_mutation(
852        &self,
853        is_currently_paused: bool,
854        edges: &mut Option<FragmentEdgeBuildResult>,
855        control_stream_manager: &ControlStreamManager,
856    ) -> Option<Mutation> {
857        match self {
858            Command::Flush => None,
859
860            Command::Pause => {
861                // Only pause when the cluster is not already paused.
862                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
863                if !is_currently_paused {
864                    Some(Mutation::Pause(PauseMutation {}))
865                } else {
866                    None
867                }
868            }
869
870            Command::Resume => {
871                // Only resume when the cluster is paused with the same reason.
872                if is_currently_paused {
873                    Some(Mutation::Resume(ResumeMutation {}))
874                } else {
875                    None
876                }
877            }
878
879            Command::SourceChangeSplit(SplitState {
880                split_assignment, ..
881            }) => {
882                let mut diff = HashMap::new();
883
884                for actor_splits in split_assignment.values() {
885                    diff.extend(actor_splits.clone());
886                }
887
888                Some(Mutation::Splits(SourceChangeSplitMutation {
889                    actor_splits: build_actor_connector_splits(&diff),
890                }))
891            }
892
893            Command::Throttle(config) => {
894                let mut actor_to_apply = HashMap::new();
895                for per_fragment in config.values() {
896                    actor_to_apply.extend(
897                        per_fragment
898                            .iter()
899                            .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
900                    );
901                }
902
903                Some(Mutation::Throttle(ThrottleMutation {
904                    actor_throttle: actor_to_apply,
905                }))
906            }
907
908            Command::DropStreamingJobs {
909                actors,
910                dropped_sink_fragment_by_targets,
911                ..
912            } => Some(Mutation::Stop(StopMutation {
913                actors: actors.clone(),
914                dropped_sink_fragments: dropped_sink_fragment_by_targets
915                    .values()
916                    .flatten()
917                    .cloned()
918                    .collect(),
919            })),
920
921            Command::CreateStreamingJob {
922                info:
923                    CreateStreamingJobCommandInfo {
924                        stream_job_fragments: table_fragments,
925                        init_split_assignment: split_assignment,
926                        upstream_fragment_downstreams,
927                        fragment_backfill_ordering,
928                        cdc_table_snapshot_split_assignment,
929                        ..
930                    },
931                job_type,
932                ..
933            } => {
934                let edges = edges.as_mut().expect("should exist");
935                let added_actors = table_fragments.actor_ids();
936                let actor_splits = split_assignment
937                    .values()
938                    .flat_map(build_actor_connector_splits)
939                    .collect();
940                let subscriptions_to_add =
941                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
942                        job_type
943                    {
944                        snapshot_backfill_info
945                            .upstream_mv_table_id_to_backfill_epoch
946                            .keys()
947                            .map(|table_id| SubscriptionUpstreamInfo {
948                                subscriber_id: table_fragments.stream_job_id().table_id,
949                                upstream_mv_table_id: table_id.table_id,
950                            })
951                            .collect()
952                    } else {
953                        Default::default()
954                    };
955                let backfill_nodes_to_pause: Vec<_> =
956                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
957                        .into_iter()
958                        .collect();
959
960                let new_upstream_sinks =
961                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
962                        sink_fragment_id,
963                        sink_output_fields,
964                        project_exprs,
965                        new_sink_downstream,
966                        ..
967                    }) = job_type
968                    {
969                        let new_sink_actors = table_fragments
970                            .actors_to_create()
971                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
972                            .exactly_one()
973                            .map(|(_, _, actors)| {
974                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
975                                    actor_id: actor.actor_id,
976                                    host: Some(control_stream_manager.host_addr(worker_id)),
977                                })
978                            })
979                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
980                        let new_upstream_sink = PbNewUpstreamSink {
981                            info: Some(PbUpstreamSinkInfo {
982                                upstream_fragment_id: *sink_fragment_id,
983                                sink_output_schema: sink_output_fields.clone(),
984                                project_exprs: project_exprs.clone(),
985                            }),
986                            upstream_actors: new_sink_actors.collect(),
987                        };
988                        HashMap::from([(
989                            new_sink_downstream.downstream_fragment_id,
990                            new_upstream_sink.clone(),
991                        )])
992                    } else {
993                        HashMap::new()
994                    };
995
996                let add_mutation = AddMutation {
997                    actor_dispatchers: edges
998                        .dispatchers
999                        .extract_if(|fragment_id, _| {
1000                            upstream_fragment_downstreams.contains_key(fragment_id)
1001                        })
1002                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1003                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1004                        .collect(),
1005                    added_actors,
1006                    actor_splits,
1007                    // If the cluster is already paused, the new actors should be paused too.
1008                    pause: is_currently_paused,
1009                    subscriptions_to_add,
1010                    backfill_nodes_to_pause,
1011                    actor_cdc_table_snapshot_splits:
1012                        build_pb_actor_cdc_table_snapshot_splits_with_generation(
1013                            cdc_table_snapshot_split_assignment.clone(),
1014                        )
1015                        .into(),
1016                    new_upstream_sinks,
1017                };
1018
1019                Some(Mutation::Add(add_mutation))
1020            }
1021            Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
1022                Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1023                    info: jobs_to_merge
1024                        .iter()
1025                        .flat_map(|(table_id, (backfill_upstream_tables, _))| {
1026                            backfill_upstream_tables
1027                                .iter()
1028                                .map(move |upstream_table_id| SubscriptionUpstreamInfo {
1029                                    subscriber_id: table_id.table_id,
1030                                    upstream_mv_table_id: upstream_table_id.table_id,
1031                                })
1032                        })
1033                        .collect(),
1034                }))
1035            }
1036
1037            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1038                old_fragments,
1039                replace_upstream,
1040                upstream_fragment_downstreams,
1041                init_split_assignment,
1042                auto_refresh_schema_sinks,
1043                cdc_table_snapshot_split_assignment,
1044                ..
1045            }) => {
1046                let edges = edges.as_mut().expect("should exist");
1047                let merge_updates = edges
1048                    .merge_updates
1049                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1050                    .collect();
1051                let dispatchers = edges
1052                    .dispatchers
1053                    .extract_if(|fragment_id, _| {
1054                        upstream_fragment_downstreams.contains_key(fragment_id)
1055                    })
1056                    .collect();
1057                let cdc_table_snapshot_split_assignment =
1058                    if cdc_table_snapshot_split_assignment.is_empty() {
1059                        CdcTableSnapshotSplitAssignmentWithGeneration::empty()
1060                    } else {
1061                        CdcTableSnapshotSplitAssignmentWithGeneration::new(
1062                            cdc_table_snapshot_split_assignment.clone(),
1063                            control_stream_manager
1064                                .env
1065                                .cdc_table_backfill_tracker
1066                                .next_generation(iter::once(old_fragments.stream_job_id.table_id)),
1067                        )
1068                    };
1069                Self::generate_update_mutation_for_replace_table(
1070                    old_fragments.actor_ids().into_iter().chain(
1071                        auto_refresh_schema_sinks
1072                            .as_ref()
1073                            .into_iter()
1074                            .flat_map(|sinks| {
1075                                sinks.iter().flat_map(|sink| {
1076                                    sink.original_fragment
1077                                        .actors
1078                                        .iter()
1079                                        .map(|actor| actor.actor_id)
1080                                })
1081                            }),
1082                    ),
1083                    merge_updates,
1084                    dispatchers,
1085                    init_split_assignment,
1086                    cdc_table_snapshot_split_assignment,
1087                    auto_refresh_schema_sinks.as_ref(),
1088                )
1089            }
1090
1091            Command::RescheduleFragment {
1092                reschedules,
1093                fragment_actors,
1094                ..
1095            } => {
1096                let mut dispatcher_update = HashMap::new();
1097                for reschedule in reschedules.values() {
1098                    for &(upstream_fragment_id, dispatcher_id) in
1099                        &reschedule.upstream_fragment_dispatcher_ids
1100                    {
1101                        // Find the actors of the upstream fragment.
1102                        let upstream_actor_ids = fragment_actors
1103                            .get(&upstream_fragment_id)
1104                            .expect("should contain");
1105
1106                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1107
1108                        // Record updates for all actors.
1109                        for &actor_id in upstream_actor_ids {
1110                            let added_downstream_actor_id = if upstream_reschedule
1111                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1112                                .unwrap_or(true)
1113                            {
1114                                reschedule
1115                                    .added_actors
1116                                    .values()
1117                                    .flatten()
1118                                    .cloned()
1119                                    .collect()
1120                            } else {
1121                                Default::default()
1122                            };
1123                            // Index with the dispatcher id to check duplicates.
1124                            dispatcher_update
1125                                .try_insert(
1126                                    (actor_id, dispatcher_id),
1127                                    DispatcherUpdate {
1128                                        actor_id,
1129                                        dispatcher_id,
1130                                        hash_mapping: reschedule
1131                                            .upstream_dispatcher_mapping
1132                                            .as_ref()
1133                                            .map(|m| m.to_protobuf()),
1134                                        added_downstream_actor_id,
1135                                        removed_downstream_actor_id: reschedule
1136                                            .removed_actors
1137                                            .iter()
1138                                            .cloned()
1139                                            .collect(),
1140                                    },
1141                                )
1142                                .unwrap();
1143                        }
1144                    }
1145                }
1146                let dispatcher_update = dispatcher_update.into_values().collect();
1147
1148                let mut merge_update = HashMap::new();
1149                for (&fragment_id, reschedule) in reschedules {
1150                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1151                        // Find the actors of the downstream fragment.
1152                        let downstream_actor_ids = fragment_actors
1153                            .get(&downstream_fragment_id)
1154                            .expect("should contain");
1155
1156                        // Downstream removed actors should be skipped
1157                        // Newly created actors of the current fragment will not dispatch Update
1158                        // barriers to them
1159                        let downstream_removed_actors: HashSet<_> = reschedules
1160                            .get(&downstream_fragment_id)
1161                            .map(|downstream_reschedule| {
1162                                downstream_reschedule
1163                                    .removed_actors
1164                                    .iter()
1165                                    .copied()
1166                                    .collect()
1167                            })
1168                            .unwrap_or_default();
1169
1170                        // Record updates for all actors.
1171                        for &actor_id in downstream_actor_ids {
1172                            if downstream_removed_actors.contains(&actor_id) {
1173                                continue;
1174                            }
1175
1176                            // Index with the fragment id to check duplicates.
1177                            merge_update
1178                                .try_insert(
1179                                    (actor_id, fragment_id),
1180                                    MergeUpdate {
1181                                        actor_id,
1182                                        upstream_fragment_id: fragment_id,
1183                                        new_upstream_fragment_id: None,
1184                                        added_upstream_actors: reschedule
1185                                            .added_actors
1186                                            .iter()
1187                                            .flat_map(|(worker_id, actors)| {
1188                                                let host =
1189                                                    control_stream_manager.host_addr(*worker_id);
1190                                                actors.iter().map(move |actor_id| PbActorInfo {
1191                                                    actor_id: *actor_id,
1192                                                    host: Some(host.clone()),
1193                                                })
1194                                            })
1195                                            .collect(),
1196                                        removed_upstream_actor_id: reschedule
1197                                            .removed_actors
1198                                            .iter()
1199                                            .cloned()
1200                                            .collect(),
1201                                    },
1202                                )
1203                                .unwrap();
1204                        }
1205                    }
1206                }
1207                let merge_update = merge_update.into_values().collect();
1208
1209                let mut actor_vnode_bitmap_update = HashMap::new();
1210                for reschedule in reschedules.values() {
1211                    // Record updates for all actors in this fragment.
1212                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1213                        let bitmap = bitmap.to_protobuf();
1214                        actor_vnode_bitmap_update
1215                            .try_insert(actor_id, bitmap)
1216                            .unwrap();
1217                    }
1218                }
1219                let dropped_actors = reschedules
1220                    .values()
1221                    .flat_map(|r| r.removed_actors.iter().copied())
1222                    .collect();
1223                let mut actor_splits = HashMap::new();
1224                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1225                let mut cdc_table_ids: HashSet<_> = HashSet::default();
1226                for reschedule in reschedules.values() {
1227                    for (actor_id, splits) in &reschedule.actor_splits {
1228                        actor_splits.insert(
1229                            *actor_id as ActorId,
1230                            ConnectorSplits {
1231                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1232                            },
1233                        );
1234                    }
1235                    actor_cdc_table_snapshot_splits.extend(
1236                        build_pb_actor_cdc_table_snapshot_splits(
1237                            reschedule.cdc_table_snapshot_split_assignment.clone(),
1238                        ),
1239                    );
1240                    if let Some(cdc_table_id) = reschedule.cdc_table_id {
1241                        cdc_table_ids.insert(cdc_table_id);
1242                    }
1243                }
1244
1245                // we don't create dispatchers in reschedule scenario
1246                let actor_new_dispatchers = HashMap::new();
1247                let actor_cdc_table_snapshot_splits = if actor_cdc_table_snapshot_splits.is_empty()
1248                {
1249                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
1250                        CdcTableSnapshotSplitAssignmentWithGeneration::empty(),
1251                    )
1252                    .into()
1253                } else {
1254                    PbCdcTableSnapshotSplitsWithGeneration {
1255                        splits: actor_cdc_table_snapshot_splits,
1256                        generation: control_stream_manager
1257                            .env
1258                            .cdc_table_backfill_tracker
1259                            .next_generation(cdc_table_ids.into_iter()),
1260                    }
1261                    .into()
1262                };
1263                let mutation = Mutation::Update(UpdateMutation {
1264                    dispatcher_update,
1265                    merge_update,
1266                    actor_vnode_bitmap_update,
1267                    dropped_actors,
1268                    actor_splits,
1269                    actor_new_dispatchers,
1270                    actor_cdc_table_snapshot_splits,
1271                    sink_add_columns: Default::default(),
1272                });
1273                tracing::debug!("update mutation: {mutation:?}");
1274                Some(mutation)
1275            }
1276
1277            Command::CreateSubscription {
1278                upstream_mv_table_id,
1279                subscription_id,
1280                ..
1281            } => Some(Mutation::Add(AddMutation {
1282                actor_dispatchers: Default::default(),
1283                added_actors: vec![],
1284                actor_splits: Default::default(),
1285                pause: false,
1286                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1287                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1288                    subscriber_id: *subscription_id,
1289                }],
1290                backfill_nodes_to_pause: vec![],
1291                actor_cdc_table_snapshot_splits: Default::default(),
1292                new_upstream_sinks: Default::default(),
1293            })),
1294            Command::DropSubscription {
1295                upstream_mv_table_id,
1296                subscription_id,
1297            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1298                info: vec![SubscriptionUpstreamInfo {
1299                    subscriber_id: *subscription_id,
1300                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1301                }],
1302            })),
1303            Command::ConnectorPropsChange(config) => {
1304                let mut connector_props_infos = HashMap::default();
1305                for (k, v) in config {
1306                    connector_props_infos.insert(
1307                        *k,
1308                        ConnectorPropsInfo {
1309                            connector_props_info: v.clone(),
1310                        },
1311                    );
1312                }
1313                Some(Mutation::ConnectorPropsChange(
1314                    ConnectorPropsChangeMutation {
1315                        connector_props_infos,
1316                    },
1317                ))
1318            }
1319            Command::StartFragmentBackfill { fragment_ids } => Some(
1320                Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1321                    fragment_ids: fragment_ids.clone(),
1322                }),
1323            ),
1324            Command::Refresh {
1325                table_id,
1326                associated_source_id,
1327            } => Some(Mutation::RefreshStart(
1328                risingwave_pb::stream_plan::RefreshStartMutation {
1329                    table_id: table_id.table_id,
1330                    associated_source_id: associated_source_id.table_id,
1331                },
1332            )),
1333            Command::LoadFinish {
1334                table_id: _,
1335                associated_source_id,
1336            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1337                associated_source_id: associated_source_id.table_id,
1338            })),
1339        }
1340    }
1341
1342    pub(super) fn actors_to_create(
1343        &self,
1344        graph_info: &InflightDatabaseInfo,
1345        edges: &mut Option<FragmentEdgeBuildResult>,
1346        control_stream_manager: &ControlStreamManager,
1347    ) -> Option<StreamJobActorsToCreate> {
1348        match self {
1349            Command::CreateStreamingJob { info, job_type, .. } => {
1350                if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1351                    // for snapshot backfill, the actors to create is measured separately
1352                    return None;
1353                }
1354                let actors_to_create = info.stream_job_fragments.actors_to_create();
1355                let edges = edges.as_mut().expect("should exist");
1356                Some(edges.collect_actors_to_create(actors_to_create))
1357            }
1358            Command::RescheduleFragment {
1359                reschedules,
1360                fragment_actors,
1361                ..
1362            } => {
1363                let mut actor_upstreams = Self::collect_actor_upstreams(
1364                    reschedules.iter().map(|(fragment_id, reschedule)| {
1365                        (
1366                            *fragment_id,
1367                            reschedule.newly_created_actors.values().map(
1368                                |((actor, dispatchers), _)| {
1369                                    (actor.actor_id, dispatchers.as_slice())
1370                                },
1371                            ),
1372                        )
1373                    }),
1374                    Some((reschedules, fragment_actors)),
1375                    graph_info,
1376                    control_stream_manager,
1377                );
1378                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1379                for (fragment_id, (actor, dispatchers), worker_id) in
1380                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1381                        reschedule
1382                            .newly_created_actors
1383                            .values()
1384                            .map(|(actors, status)| (*fragment_id, actors, status))
1385                    })
1386                {
1387                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1388                    map.entry(*worker_id)
1389                        .or_default()
1390                        .entry(fragment_id)
1391                        .or_insert_with(|| {
1392                            let node = graph_info.fragment(fragment_id).nodes.clone();
1393                            (node, vec![])
1394                        })
1395                        .1
1396                        .push((actor.clone(), upstreams, dispatchers.clone()));
1397                }
1398                Some(map)
1399            }
1400            Command::ReplaceStreamJob(replace_table) => {
1401                let edges = edges.as_mut().expect("should exist");
1402                let mut actors =
1403                    edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create());
1404                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1405                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1406                        (
1407                            sink.new_fragment.fragment_id,
1408                            &sink.new_fragment.nodes,
1409                            sink.new_fragment.actors.iter().map(|actor| {
1410                                (
1411                                    actor,
1412                                    sink.actor_status[&actor.actor_id]
1413                                        .location
1414                                        .as_ref()
1415                                        .unwrap()
1416                                        .worker_node_id as _,
1417                                )
1418                            }),
1419                        )
1420                    }));
1421                    for (worker_id, fragment_actors) in sink_actors {
1422                        actors.entry(worker_id).or_default().extend(fragment_actors);
1423                    }
1424                }
1425                Some(actors)
1426            }
1427            _ => None,
1428        }
1429    }
1430
1431    fn generate_update_mutation_for_replace_table(
1432        dropped_actors: impl IntoIterator<Item = ActorId>,
1433        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1434        dispatchers: FragmentActorDispatchers,
1435        init_split_assignment: &SplitAssignment,
1436        cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
1437        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1438    ) -> Option<Mutation> {
1439        let dropped_actors = dropped_actors.into_iter().collect();
1440
1441        let actor_new_dispatchers = dispatchers
1442            .into_values()
1443            .flatten()
1444            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1445            .collect();
1446
1447        let actor_splits = init_split_assignment
1448            .values()
1449            .flat_map(build_actor_connector_splits)
1450            .collect();
1451        Some(Mutation::Update(UpdateMutation {
1452            actor_new_dispatchers,
1453            merge_update: merge_updates.into_values().flatten().collect(),
1454            dropped_actors,
1455            actor_splits,
1456            actor_cdc_table_snapshot_splits:
1457                build_pb_actor_cdc_table_snapshot_splits_with_generation(
1458                    cdc_table_snapshot_split_assignment,
1459                )
1460                .into(),
1461            sink_add_columns: auto_refresh_schema_sinks
1462                .as_ref()
1463                .into_iter()
1464                .flat_map(|sinks| {
1465                    sinks.iter().map(|sink| {
1466                        (
1467                            sink.original_sink.id,
1468                            PbSinkAddColumns {
1469                                fields: sink
1470                                    .newly_add_fields
1471                                    .iter()
1472                                    .map(|field| field.to_prost())
1473                                    .collect(),
1474                            },
1475                        )
1476                    })
1477                })
1478                .collect(),
1479            ..Default::default()
1480        }))
1481    }
1482
1483    /// For `CancelStreamingJob`, returns the table id of the target table.
1484    pub fn jobs_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1485        match self {
1486            Command::DropStreamingJobs {
1487                streaming_job_ids, ..
1488            } => Some(streaming_job_ids.iter().cloned()),
1489            _ => None,
1490        }
1491        .into_iter()
1492        .flatten()
1493    }
1494}
1495
1496impl Command {
1497    #[expect(clippy::type_complexity)]
1498    pub(super) fn collect_actor_upstreams(
1499        actor_dispatchers: impl Iterator<
1500            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1501        >,
1502        reschedule_dispatcher_update: Option<(
1503            &HashMap<FragmentId, Reschedule>,
1504            &HashMap<FragmentId, HashSet<ActorId>>,
1505        )>,
1506        graph_info: &InflightDatabaseInfo,
1507        control_stream_manager: &ControlStreamManager,
1508    ) -> HashMap<ActorId, ActorUpstreams> {
1509        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1510        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1511            let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1512            for (upstream_actor_id, dispatchers) in upstream_actors {
1513                let upstream_actor_location =
1514                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1515                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1516                for downstream_actor_id in dispatchers
1517                    .iter()
1518                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1519                {
1520                    actor_upstreams
1521                        .entry(*downstream_actor_id)
1522                        .or_default()
1523                        .entry(upstream_fragment_id)
1524                        .or_default()
1525                        .insert(
1526                            upstream_actor_id,
1527                            PbActorInfo {
1528                                actor_id: upstream_actor_id,
1529                                host: Some(upstream_actor_host.clone()),
1530                            },
1531                        );
1532                }
1533            }
1534        }
1535        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1536            for reschedule in reschedules.values() {
1537                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1538                    let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1539                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1540                    for upstream_actor_id in fragment_actors
1541                        .get(upstream_fragment_id)
1542                        .expect("should exist")
1543                    {
1544                        let upstream_actor_location =
1545                            upstream_fragment.actors[upstream_actor_id].worker_id;
1546                        let upstream_actor_host =
1547                            control_stream_manager.host_addr(upstream_actor_location);
1548                        if let Some(upstream_reschedule) = upstream_reschedule
1549                            && upstream_reschedule
1550                                .removed_actors
1551                                .contains(upstream_actor_id)
1552                        {
1553                            continue;
1554                        }
1555                        for (_, downstream_actor_id) in
1556                            reschedule
1557                                .added_actors
1558                                .iter()
1559                                .flat_map(|(worker_id, actors)| {
1560                                    actors.iter().map(|actor| (*worker_id, *actor))
1561                                })
1562                        {
1563                            actor_upstreams
1564                                .entry(downstream_actor_id)
1565                                .or_default()
1566                                .entry(*upstream_fragment_id)
1567                                .or_default()
1568                                .insert(
1569                                    *upstream_actor_id,
1570                                    PbActorInfo {
1571                                        actor_id: *upstream_actor_id,
1572                                        host: Some(upstream_actor_host.clone()),
1573                                    },
1574                                );
1575                        }
1576                    }
1577                }
1578            }
1579        }
1580        actor_upstreams
1581    }
1582}