risingwave_meta/barrier/
command.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18use std::iter;
19
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::TableId;
23use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
24use risingwave_common::id::{JobId, SourceId};
25use risingwave_common::must_match;
26use risingwave_common::types::Timestamptz;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::{
31    CdcTableSnapshotSplitAssignment, CdcTableSnapshotSplitAssignmentWithGeneration,
32    build_pb_actor_cdc_table_snapshot_splits,
33    build_pb_actor_cdc_table_snapshot_splits_with_generation,
34};
35use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
36use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
37use risingwave_meta_model::WorkerId;
38use risingwave_pb::catalog::CreateType;
39use risingwave_pb::catalog::table::PbTableType;
40use risingwave_pb::common::PbActorInfo;
41use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
42use risingwave_pb::source::{
43    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
44};
45use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
46use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
47use risingwave_pb::stream_plan::barrier_mutation::Mutation;
48use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
49use risingwave_pb::stream_plan::stream_node::NodeBody;
50use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
51use risingwave_pb::stream_plan::update_mutation::*;
52use risingwave_pb::stream_plan::{
53    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
54    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumns, PbUpstreamSinkInfo,
55    ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
56    SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
57};
58use risingwave_pb::stream_service::BarrierCompleteResponse;
59use tracing::warn;
60
61use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
62use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
63use crate::barrier::edge_builder::FragmentEdgeBuildResult;
64use crate::barrier::info::BarrierInfo;
65use crate::barrier::rpc::ControlStreamManager;
66use crate::barrier::utils::collect_resp_info;
67use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
68use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
69use crate::manager::{StreamingJob, StreamingJobType};
70use crate::model::{
71    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
72    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
73    StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
74};
75use crate::stream::{
76    AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
77    SplitState, ThrottleConfig, UpstreamSinkInfo, build_actor_connector_splits,
78};
79
80/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
81/// in some fragment, like scaling or migrating.
82#[derive(Debug, Clone)]
83pub struct Reschedule {
84    /// Added actors in this fragment.
85    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
86
87    /// Removed actors in this fragment.
88    pub removed_actors: HashSet<ActorId>,
89
90    /// Vnode bitmap updates for some actors in this fragment.
91    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
92
93    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
94    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
95    /// New hash mapping of the upstream dispatcher to be updated.
96    ///
97    /// This field exists only when there's upstream fragment and the current fragment is
98    /// hash-sharded.
99    pub upstream_dispatcher_mapping: Option<ActorMapping>,
100
101    /// The downstream fragments of this fragment.
102    pub downstream_fragment_ids: Vec<FragmentId>,
103
104    /// Reassigned splits for source actors.
105    /// It becomes the `actor_splits` in [`UpdateMutation`].
106    /// `Source` and `SourceBackfill` are handled together here.
107    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
108
109    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
110
111    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
112
113    pub cdc_table_job_id: Option<JobId>,
114}
115
116/// Replacing an old job with a new one. All actors in the job will be rebuilt.
117///
118/// Current use cases:
119/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
120/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
121///   will replace a table job's plan.
122#[derive(Debug, Clone)]
123pub struct ReplaceStreamJobPlan {
124    pub old_fragments: StreamJobFragments,
125    pub new_fragments: StreamJobFragmentsToCreate,
126    /// Downstream jobs of the replaced job need to update their `Merge` node to
127    /// connect to the new fragment.
128    pub replace_upstream: FragmentReplaceUpstream,
129    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
130    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
131    /// We need to reassign splits for it.
132    ///
133    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
134    /// `backfill_splits`.
135    pub init_split_assignment: SplitAssignment,
136    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
137    pub streaming_job: StreamingJob,
138    /// The temporary dummy job fragments id of new table fragment
139    pub tmp_id: JobId,
140    /// The state table ids to be dropped.
141    pub to_drop_state_table_ids: Vec<TableId>,
142    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
143    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
144}
145
146impl ReplaceStreamJobPlan {
147    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
148        let mut fragment_changes = HashMap::new();
149        for (fragment_id, new_fragment) in self
150            .new_fragments
151            .new_fragment_info(&self.init_split_assignment)
152        {
153            let fragment_change = CommandFragmentChanges::NewFragment {
154                job_id: self.streaming_job.id(),
155                info: new_fragment,
156                is_existing: false,
157            };
158            fragment_changes
159                .try_insert(fragment_id, fragment_change)
160                .expect("non-duplicate");
161        }
162        for fragment in self.old_fragments.fragments.values() {
163            fragment_changes
164                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
165                .expect("non-duplicate");
166        }
167        for (fragment_id, replace_map) in &self.replace_upstream {
168            fragment_changes
169                .try_insert(
170                    *fragment_id,
171                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
172                )
173                .expect("non-duplicate");
174        }
175        if let Some(sinks) = &self.auto_refresh_schema_sinks {
176            for sink in sinks {
177                let fragment_change = CommandFragmentChanges::NewFragment {
178                    job_id: sink.original_sink.id.as_job_id(),
179                    info: sink.new_fragment_info(),
180                    is_existing: false,
181                };
182                fragment_changes
183                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
184                    .expect("non-duplicate");
185                fragment_changes
186                    .try_insert(
187                        sink.original_fragment.fragment_id,
188                        CommandFragmentChanges::RemoveFragment,
189                    )
190                    .expect("non-duplicate");
191            }
192        }
193        fragment_changes
194    }
195
196    /// `old_fragment_id` -> `new_fragment_id`
197    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
198        let mut fragment_replacements = HashMap::new();
199        for (upstream_fragment_id, new_upstream_fragment_id) in
200            self.replace_upstream.values().flatten()
201        {
202            {
203                let r =
204                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
205                if let Some(r) = r {
206                    assert_eq!(
207                        *new_upstream_fragment_id, r,
208                        "one fragment is replaced by multiple fragments"
209                    );
210                }
211            }
212        }
213        fragment_replacements
214    }
215}
216
217#[derive(educe::Educe, Clone)]
218#[educe(Debug)]
219pub struct CreateStreamingJobCommandInfo {
220    #[educe(Debug(ignore))]
221    pub stream_job_fragments: StreamJobFragmentsToCreate,
222    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
223    pub init_split_assignment: SplitAssignment,
224    pub definition: String,
225    pub job_type: StreamingJobType,
226    pub create_type: CreateType,
227    pub streaming_job: StreamingJob,
228    pub fragment_backfill_ordering: FragmentBackfillOrder,
229    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
230    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
231}
232
233impl StreamJobFragments {
234    pub(super) fn new_fragment_info<'a>(
235        &'a self,
236        assignment: &'a SplitAssignment,
237    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
238        self.fragments.values().map(|fragment| {
239            let mut fragment_splits = assignment
240                .get(&fragment.fragment_id)
241                .cloned()
242                .unwrap_or_default();
243
244            (
245                fragment.fragment_id,
246                InflightFragmentInfo {
247                    fragment_id: fragment.fragment_id,
248                    distribution_type: fragment.distribution_type.into(),
249                    fragment_type_mask: fragment.fragment_type_mask,
250                    vnode_count: fragment.vnode_count(),
251                    nodes: fragment.nodes.clone(),
252                    actors: fragment
253                        .actors
254                        .iter()
255                        .map(|actor| {
256                            (
257                                actor.actor_id,
258                                InflightActorInfo {
259                                    worker_id: self
260                                        .actor_status
261                                        .get(&actor.actor_id)
262                                        .expect("should exist")
263                                        .worker_id(),
264                                    vnode_bitmap: actor.vnode_bitmap.clone(),
265                                    splits: fragment_splits
266                                        .remove(&actor.actor_id)
267                                        .unwrap_or_default(),
268                                },
269                            )
270                        })
271                        .collect(),
272                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
273                },
274            )
275        })
276    }
277}
278
279#[derive(Debug, Clone)]
280pub struct SnapshotBackfillInfo {
281    /// `table_id` -> `Some(snapshot_backfill_epoch)`
282    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
283    /// by global barrier worker when handling the command.
284    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
285}
286
287#[derive(Debug, Clone)]
288pub enum CreateStreamingJobType {
289    Normal,
290    SinkIntoTable(UpstreamSinkInfo),
291    SnapshotBackfill(SnapshotBackfillInfo),
292}
293
294/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
295/// it will [build different barriers to send](Self::to_mutation),
296/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
297// FIXME: this enum is significantly large on stack, box it
298#[derive(Debug)]
299pub enum Command {
300    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
301    /// all messages before the checkpoint barrier should have been committed.
302    Flush,
303
304    /// `Pause` command generates a `Pause` barrier **only if**
305    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
306    Pause,
307
308    /// `Resume` command generates a `Resume` barrier **only
309    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
310    /// will be generated.
311    Resume,
312
313    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
314    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
315    /// dropped by reference counts before.
316    ///
317    /// Barriers from the actors to be dropped will STILL be collected.
318    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
319    /// drop actors, and then delete the job fragments info from meta store.
320    DropStreamingJobs {
321        streaming_job_ids: HashSet<JobId>,
322        actors: Vec<ActorId>,
323        unregistered_state_table_ids: HashSet<TableId>,
324        unregistered_fragment_ids: HashSet<FragmentId>,
325        // target_fragment -> [sink_fragments]
326        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
327    },
328
329    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
330    ///
331    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
332    /// be collected since the barrier should be passthrough.
333    ///
334    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
335    /// it adds the job fragments info to meta store. However, the creating progress will **last
336    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
337    /// will be set to `Created`.
338    CreateStreamingJob {
339        info: CreateStreamingJobCommandInfo,
340        job_type: CreateStreamingJobType,
341        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
342    },
343    MergeSnapshotBackfillStreamingJobs(
344        HashMap<JobId, (HashSet<TableId>, HashMap<FragmentId, InflightFragmentInfo>)>,
345    ),
346
347    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
348    /// Mainly used for scaling and migration.
349    ///
350    /// Barriers from which actors should be collected, and the post behavior of this command are
351    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
352    RescheduleFragment {
353        reschedules: HashMap<FragmentId, Reschedule>,
354        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
355        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
356    },
357
358    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
359    /// essentially switching the downstream of the old job fragments to the new ones, and
360    /// dropping the old job fragments. Used for schema change.
361    ///
362    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
363    /// of the Merge executors are changed additionally.
364    ReplaceStreamJob(ReplaceStreamJobPlan),
365
366    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
367    /// changed splits.
368    SourceChangeSplit(SplitState),
369
370    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
371    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
372    Throttle(ThrottleConfig),
373
374    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
375    /// materialize executor to start storing old value for subscription.
376    CreateSubscription {
377        subscription_id: SubscriptionId,
378        upstream_mv_table_id: TableId,
379        retention_second: u64,
380    },
381
382    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
383    /// materialize executor to stop storing old value when there is no
384    /// subscription depending on it.
385    DropSubscription {
386        subscription_id: SubscriptionId,
387        upstream_mv_table_id: TableId,
388    },
389
390    ConnectorPropsChange(ConnectorPropsChange),
391
392    /// `StartFragmentBackfill` command will trigger backfilling for specified scans by `fragment_id`.
393    StartFragmentBackfill {
394        fragment_ids: Vec<FragmentId>,
395    },
396
397    /// `Refresh` command generates a barrier to refresh a table by truncating state
398    /// and reloading data from source.
399    Refresh {
400        table_id: TableId,
401        associated_source_id: SourceId,
402    },
403    ListFinish {
404        table_id: TableId,
405        associated_source_id: SourceId,
406    },
407    LoadFinish {
408        table_id: TableId,
409        associated_source_id: SourceId,
410    },
411}
412
413// For debugging and observability purposes. Can add more details later if needed.
414impl std::fmt::Display for Command {
415    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
416        match self {
417            Command::Flush => write!(f, "Flush"),
418            Command::Pause => write!(f, "Pause"),
419            Command::Resume => write!(f, "Resume"),
420            Command::DropStreamingJobs {
421                streaming_job_ids, ..
422            } => {
423                write!(
424                    f,
425                    "DropStreamingJobs: {}",
426                    streaming_job_ids.iter().sorted().join(", ")
427                )
428            }
429            Command::CreateStreamingJob { info, .. } => {
430                write!(f, "CreateStreamingJob: {}", info.streaming_job)
431            }
432            Command::MergeSnapshotBackfillStreamingJobs(_) => {
433                write!(f, "MergeSnapshotBackfillStreamingJobs")
434            }
435            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
436            Command::ReplaceStreamJob(plan) => {
437                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
438            }
439            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
440            Command::Throttle(_) => write!(f, "Throttle"),
441            Command::CreateSubscription {
442                subscription_id, ..
443            } => write!(f, "CreateSubscription: {subscription_id}"),
444            Command::DropSubscription {
445                subscription_id, ..
446            } => write!(f, "DropSubscription: {subscription_id}"),
447            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
448            Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
449            Command::Refresh {
450                table_id,
451                associated_source_id,
452            } => write!(
453                f,
454                "Refresh: {} (source: {})",
455                table_id, associated_source_id
456            ),
457            Command::ListFinish {
458                table_id,
459                associated_source_id,
460            } => write!(
461                f,
462                "ListFinish: {} (source: {})",
463                table_id, associated_source_id
464            ),
465            Command::LoadFinish {
466                table_id,
467                associated_source_id,
468            } => write!(
469                f,
470                "LoadFinish: {} (source: {})",
471                table_id, associated_source_id
472            ),
473        }
474    }
475}
476
477impl Command {
478    pub fn pause() -> Self {
479        Self::Pause
480    }
481
482    pub fn resume() -> Self {
483        Self::Resume
484    }
485
486    pub(crate) fn fragment_changes(
487        &self,
488    ) -> Option<(Option<JobId>, HashMap<FragmentId, CommandFragmentChanges>)> {
489        match self {
490            Command::Flush => None,
491            Command::Pause => None,
492            Command::Resume => None,
493            Command::DropStreamingJobs {
494                unregistered_fragment_ids,
495                dropped_sink_fragment_by_targets,
496                ..
497            } => {
498                let changes = unregistered_fragment_ids
499                    .iter()
500                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
501                    .chain(dropped_sink_fragment_by_targets.iter().map(
502                        |(target_fragment, sink_fragments)| {
503                            (
504                                *target_fragment,
505                                CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
506                            )
507                        },
508                    ))
509                    .collect();
510
511                Some((None, changes))
512            }
513            Command::CreateStreamingJob { info, job_type, .. } => {
514                assert!(
515                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
516                    "should handle fragment changes separately for snapshot backfill"
517                );
518                let mut changes: HashMap<_, _> = info
519                    .stream_job_fragments
520                    .new_fragment_info(&info.init_split_assignment)
521                    .map(|(fragment_id, fragment_info)| {
522                        (
523                            fragment_id,
524                            CommandFragmentChanges::NewFragment {
525                                job_id: info.streaming_job.id(),
526                                info: fragment_info,
527                                is_existing: false,
528                            },
529                        )
530                    })
531                    .collect();
532
533                if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
534                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
535                    changes.insert(
536                        downstream_fragment_id,
537                        CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
538                            upstream_fragment_id: ctx.sink_fragment_id,
539                            sink_output_schema: ctx.sink_output_fields.clone(),
540                            project_exprs: ctx.project_exprs.clone(),
541                        }),
542                    );
543                }
544
545                Some((Some(info.streaming_job.id()), changes))
546            }
547            Command::RescheduleFragment { reschedules, .. } => Some((
548                None,
549                reschedules
550                    .iter()
551                    .map(|(fragment_id, reschedule)| {
552                        (
553                            *fragment_id,
554                            CommandFragmentChanges::Reschedule {
555                                new_actors: reschedule
556                                    .added_actors
557                                    .iter()
558                                    .flat_map(|(node_id, actors)| {
559                                        actors.iter().map(|actor_id| {
560                                            (
561                                                *actor_id,
562                                                InflightActorInfo {
563                                                    worker_id: *node_id,
564                                                    vnode_bitmap: reschedule
565                                                        .newly_created_actors
566                                                        .get(actor_id)
567                                                        .expect("should exist")
568                                                        .0
569                                                        .0
570                                                        .vnode_bitmap
571                                                        .clone(),
572                                                    splits: reschedule
573                                                        .actor_splits
574                                                        .get(actor_id)
575                                                        .cloned()
576                                                        .unwrap_or_default(),
577                                                },
578                                            )
579                                        })
580                                    })
581                                    .collect(),
582                                actor_update_vnode_bitmap: reschedule
583                                    .vnode_bitmap_updates
584                                    .iter()
585                                    .filter(|(actor_id, _)| {
586                                        // only keep the existing actors
587                                        !reschedule.newly_created_actors.contains_key(actor_id)
588                                    })
589                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
590                                    .collect(),
591                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
592                                actor_splits: reschedule.actor_splits.clone(),
593                            },
594                        )
595                    })
596                    .collect(),
597            )),
598            Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
599            Command::MergeSnapshotBackfillStreamingJobs(_) => None,
600            Command::SourceChangeSplit(SplitState {
601                split_assignment, ..
602            }) => Some((
603                None,
604                split_assignment
605                    .iter()
606                    .map(|(&fragment_id, splits)| {
607                        (
608                            fragment_id,
609                            CommandFragmentChanges::SplitAssignment {
610                                actor_splits: splits.clone(),
611                            },
612                        )
613                    })
614                    .collect(),
615            )),
616            Command::Throttle(_) => None,
617            Command::CreateSubscription { .. } => None,
618            Command::DropSubscription { .. } => None,
619            Command::ConnectorPropsChange(_) => None,
620            Command::StartFragmentBackfill { .. } => None,
621            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
622            Command::ListFinish { .. } => None, // ListFinish doesn't change fragment structure
623            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
624        }
625    }
626
627    pub fn need_checkpoint(&self) -> bool {
628        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
629        !matches!(self, Command::Resume)
630    }
631}
632
633#[derive(Debug, Clone)]
634pub enum BarrierKind {
635    Initial,
636    Barrier,
637    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
638    Checkpoint(Vec<u64>),
639}
640
641impl BarrierKind {
642    pub fn to_protobuf(&self) -> PbBarrierKind {
643        match self {
644            BarrierKind::Initial => PbBarrierKind::Initial,
645            BarrierKind::Barrier => PbBarrierKind::Barrier,
646            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
647        }
648    }
649
650    pub fn is_checkpoint(&self) -> bool {
651        matches!(self, BarrierKind::Checkpoint(_))
652    }
653
654    pub fn is_initial(&self) -> bool {
655        matches!(self, BarrierKind::Initial)
656    }
657
658    pub fn as_str_name(&self) -> &'static str {
659        match self {
660            BarrierKind::Initial => "Initial",
661            BarrierKind::Barrier => "Barrier",
662            BarrierKind::Checkpoint(_) => "Checkpoint",
663        }
664    }
665}
666
667/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
668/// [`Command`].
669pub(super) struct CommandContext {
670    mv_subscription_max_retention: HashMap<TableId, u64>,
671
672    pub(super) barrier_info: BarrierInfo,
673
674    pub(super) table_ids_to_commit: HashSet<TableId>,
675
676    pub(super) command: Option<Command>,
677
678    /// The tracing span of this command.
679    ///
680    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
681    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
682    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
683    _span: tracing::Span,
684}
685
686impl std::fmt::Debug for CommandContext {
687    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
688        f.debug_struct("CommandContext")
689            .field("barrier_info", &self.barrier_info)
690            .field("command", &self.command)
691            .finish()
692    }
693}
694
695impl std::fmt::Display for CommandContext {
696    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
697        write!(
698            f,
699            "prev_epoch={}, curr_epoch={}, kind={}",
700            self.barrier_info.prev_epoch.value().0,
701            self.barrier_info.curr_epoch.value().0,
702            self.barrier_info.kind.as_str_name()
703        )?;
704        if let Some(command) = &self.command {
705            write!(f, ", command={}", command)?;
706        }
707        Ok(())
708    }
709}
710
711impl CommandContext {
712    pub(super) fn new(
713        barrier_info: BarrierInfo,
714        mv_subscription_max_retention: HashMap<TableId, u64>,
715        table_ids_to_commit: HashSet<TableId>,
716        command: Option<Command>,
717        span: tracing::Span,
718    ) -> Self {
719        Self {
720            mv_subscription_max_retention,
721            barrier_info,
722            table_ids_to_commit,
723            command,
724            _span: span,
725        }
726    }
727
728    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
729        let Some(truncate_timestamptz) = Timestamptz::from_secs(
730            self.barrier_info
731                .prev_epoch
732                .value()
733                .as_timestamptz()
734                .timestamp()
735                - retention_second as i64,
736        ) else {
737            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
738            return self.barrier_info.prev_epoch.value();
739        };
740        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
741    }
742
743    pub(super) fn collect_commit_epoch_info(
744        &self,
745        info: &mut CommitEpochInfo,
746        resps: Vec<BarrierCompleteResponse>,
747        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
748    ) {
749        let (
750            sst_to_context,
751            synced_ssts,
752            new_table_watermarks,
753            old_value_ssts,
754            vector_index_adds,
755            truncate_tables,
756        ) = collect_resp_info(resps);
757
758        let new_table_fragment_infos =
759            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
760                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
761            {
762                let table_fragments = &info.stream_job_fragments;
763                let mut table_ids: HashSet<_> =
764                    table_fragments.internal_table_ids().into_iter().collect();
765                if let Some(mv_table_id) = table_fragments.mv_table_id() {
766                    table_ids.insert(mv_table_id);
767                }
768
769                vec![NewTableFragmentInfo { table_ids }]
770            } else {
771                vec![]
772            };
773
774        let mut mv_log_store_truncate_epoch = HashMap::new();
775        // TODO: may collect cross db snapshot backfill
776        let mut update_truncate_epoch =
777            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
778                Entry::Occupied(mut entry) => {
779                    let prev_truncate_epoch = entry.get_mut();
780                    if truncate_epoch < *prev_truncate_epoch {
781                        *prev_truncate_epoch = truncate_epoch;
782                    }
783                }
784                Entry::Vacant(entry) => {
785                    entry.insert(truncate_epoch);
786                }
787            };
788        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
789            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
790            update_truncate_epoch(*mv_table_id, truncate_epoch);
791        }
792        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
793            for mv_table_id in upstream_mv_table_ids {
794                update_truncate_epoch(mv_table_id, backfill_epoch);
795            }
796        }
797
798        let table_new_change_log = build_table_change_log_delta(
799            old_value_ssts.into_iter(),
800            synced_ssts.iter().map(|sst| &sst.sst_info),
801            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
802            mv_log_store_truncate_epoch.into_iter(),
803        );
804
805        let epoch = self.barrier_info.prev_epoch();
806        for table_id in &self.table_ids_to_commit {
807            info.tables_to_commit
808                .try_insert(*table_id, epoch)
809                .expect("non duplicate");
810        }
811
812        info.sstables.extend(synced_ssts);
813        info.new_table_watermarks.extend(new_table_watermarks);
814        info.sst_to_context.extend(sst_to_context);
815        info.new_table_fragment_infos
816            .extend(new_table_fragment_infos);
817        info.change_log_delta.extend(table_new_change_log);
818        for (table_id, vector_index_adds) in vector_index_adds {
819            info.vector_index_delta
820                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
821                .expect("non-duplicate");
822        }
823        if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
824            for fragment in job_info.stream_job_fragments.fragments.values() {
825                visit_stream_node_cont(&fragment.nodes, |node| {
826                    match node.node_body.as_ref().unwrap() {
827                        NodeBody::VectorIndexWrite(vector_index_write) => {
828                            let index_table = vector_index_write.table.as_ref().unwrap();
829                            assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
830                            info.vector_index_delta
831                                .try_insert(
832                                    index_table.id,
833                                    VectorIndexDelta::Init(PbVectorIndexInit {
834                                        info: Some(index_table.vector_index_info.unwrap()),
835                                    }),
836                                )
837                                .expect("non-duplicate");
838                            false
839                        }
840                        _ => true,
841                    }
842                })
843            }
844        }
845        info.truncate_tables.extend(truncate_tables);
846    }
847}
848
849impl Command {
850    /// Generate a mutation for the given command.
851    ///
852    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
853    pub(super) fn to_mutation(
854        &self,
855        is_currently_paused: bool,
856        edges: &mut Option<FragmentEdgeBuildResult>,
857        control_stream_manager: &ControlStreamManager,
858    ) -> Option<Mutation> {
859        match self {
860            Command::Flush => None,
861
862            Command::Pause => {
863                // Only pause when the cluster is not already paused.
864                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
865                if !is_currently_paused {
866                    Some(Mutation::Pause(PauseMutation {}))
867                } else {
868                    None
869                }
870            }
871
872            Command::Resume => {
873                // Only resume when the cluster is paused with the same reason.
874                if is_currently_paused {
875                    Some(Mutation::Resume(ResumeMutation {}))
876                } else {
877                    None
878                }
879            }
880
881            Command::SourceChangeSplit(SplitState {
882                split_assignment, ..
883            }) => {
884                let mut diff = HashMap::new();
885
886                for actor_splits in split_assignment.values() {
887                    diff.extend(actor_splits.clone());
888                }
889
890                Some(Mutation::Splits(SourceChangeSplitMutation {
891                    actor_splits: build_actor_connector_splits(&diff),
892                }))
893            }
894
895            Command::Throttle(config) => {
896                let mut actor_to_apply = HashMap::new();
897                for per_fragment in config.values() {
898                    actor_to_apply.extend(
899                        per_fragment
900                            .iter()
901                            .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
902                    );
903                }
904
905                Some(Mutation::Throttle(ThrottleMutation {
906                    actor_throttle: actor_to_apply,
907                }))
908            }
909
910            Command::DropStreamingJobs {
911                actors,
912                dropped_sink_fragment_by_targets,
913                ..
914            } => Some(Mutation::Stop(StopMutation {
915                actors: actors.clone(),
916                dropped_sink_fragments: dropped_sink_fragment_by_targets
917                    .values()
918                    .flatten()
919                    .cloned()
920                    .collect(),
921            })),
922
923            Command::CreateStreamingJob {
924                info:
925                    CreateStreamingJobCommandInfo {
926                        stream_job_fragments: table_fragments,
927                        init_split_assignment: split_assignment,
928                        upstream_fragment_downstreams,
929                        fragment_backfill_ordering,
930                        cdc_table_snapshot_split_assignment,
931                        ..
932                    },
933                job_type,
934                ..
935            } => {
936                let edges = edges.as_mut().expect("should exist");
937                let added_actors = table_fragments.actor_ids().collect();
938                let actor_splits = split_assignment
939                    .values()
940                    .flat_map(build_actor_connector_splits)
941                    .collect();
942                let subscriptions_to_add =
943                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
944                        job_type
945                    {
946                        snapshot_backfill_info
947                            .upstream_mv_table_id_to_backfill_epoch
948                            .keys()
949                            .map(|table_id| SubscriptionUpstreamInfo {
950                                subscriber_id: table_fragments.stream_job_id().as_raw_id(),
951                                upstream_mv_table_id: *table_id,
952                            })
953                            .collect()
954                    } else {
955                        Default::default()
956                    };
957                let backfill_nodes_to_pause: Vec<_> =
958                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
959                        .into_iter()
960                        .collect();
961
962                let new_upstream_sinks =
963                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
964                        sink_fragment_id,
965                        sink_output_fields,
966                        project_exprs,
967                        new_sink_downstream,
968                        ..
969                    }) = job_type
970                    {
971                        let new_sink_actors = table_fragments
972                            .actors_to_create()
973                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
974                            .exactly_one()
975                            .map(|(_, _, actors)| {
976                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
977                                    actor_id: actor.actor_id,
978                                    host: Some(control_stream_manager.host_addr(worker_id)),
979                                })
980                            })
981                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
982                        let new_upstream_sink = PbNewUpstreamSink {
983                            info: Some(PbUpstreamSinkInfo {
984                                upstream_fragment_id: *sink_fragment_id,
985                                sink_output_schema: sink_output_fields.clone(),
986                                project_exprs: project_exprs.clone(),
987                            }),
988                            upstream_actors: new_sink_actors.collect(),
989                        };
990                        HashMap::from([(
991                            new_sink_downstream.downstream_fragment_id,
992                            new_upstream_sink,
993                        )])
994                    } else {
995                        HashMap::new()
996                    };
997
998                let add_mutation = AddMutation {
999                    actor_dispatchers: edges
1000                        .dispatchers
1001                        .extract_if(|fragment_id, _| {
1002                            upstream_fragment_downstreams.contains_key(fragment_id)
1003                        })
1004                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1005                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1006                        .collect(),
1007                    added_actors,
1008                    actor_splits,
1009                    // If the cluster is already paused, the new actors should be paused too.
1010                    pause: is_currently_paused,
1011                    subscriptions_to_add,
1012                    backfill_nodes_to_pause,
1013                    actor_cdc_table_snapshot_splits:
1014                        build_pb_actor_cdc_table_snapshot_splits_with_generation(
1015                            cdc_table_snapshot_split_assignment.clone(),
1016                        )
1017                        .into(),
1018                    new_upstream_sinks,
1019                };
1020
1021                Some(Mutation::Add(add_mutation))
1022            }
1023            Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
1024                Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1025                    info: jobs_to_merge
1026                        .iter()
1027                        .flat_map(|(table_id, (backfill_upstream_tables, _))| {
1028                            backfill_upstream_tables
1029                                .iter()
1030                                .map(move |upstream_table_id| SubscriptionUpstreamInfo {
1031                                    subscriber_id: table_id.as_raw_id(),
1032                                    upstream_mv_table_id: *upstream_table_id,
1033                                })
1034                        })
1035                        .collect(),
1036                }))
1037            }
1038
1039            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1040                old_fragments,
1041                replace_upstream,
1042                upstream_fragment_downstreams,
1043                init_split_assignment,
1044                auto_refresh_schema_sinks,
1045                cdc_table_snapshot_split_assignment,
1046                ..
1047            }) => {
1048                let edges = edges.as_mut().expect("should exist");
1049                let merge_updates = edges
1050                    .merge_updates
1051                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1052                    .collect();
1053                let dispatchers = edges
1054                    .dispatchers
1055                    .extract_if(|fragment_id, _| {
1056                        upstream_fragment_downstreams.contains_key(fragment_id)
1057                    })
1058                    .collect();
1059                let cdc_table_snapshot_split_assignment =
1060                    if cdc_table_snapshot_split_assignment.is_empty() {
1061                        CdcTableSnapshotSplitAssignmentWithGeneration::empty()
1062                    } else {
1063                        CdcTableSnapshotSplitAssignmentWithGeneration::new(
1064                            cdc_table_snapshot_split_assignment.clone(),
1065                            control_stream_manager
1066                                .env
1067                                .cdc_table_backfill_tracker
1068                                .next_generation(iter::once(old_fragments.stream_job_id)),
1069                        )
1070                    };
1071                Self::generate_update_mutation_for_replace_table(
1072                    old_fragments.actor_ids().chain(
1073                        auto_refresh_schema_sinks
1074                            .as_ref()
1075                            .into_iter()
1076                            .flat_map(|sinks| {
1077                                sinks.iter().flat_map(|sink| {
1078                                    sink.original_fragment
1079                                        .actors
1080                                        .iter()
1081                                        .map(|actor| actor.actor_id)
1082                                })
1083                            }),
1084                    ),
1085                    merge_updates,
1086                    dispatchers,
1087                    init_split_assignment,
1088                    cdc_table_snapshot_split_assignment,
1089                    auto_refresh_schema_sinks.as_ref(),
1090                )
1091            }
1092
1093            Command::RescheduleFragment {
1094                reschedules,
1095                fragment_actors,
1096                ..
1097            } => {
1098                let mut dispatcher_update = HashMap::new();
1099                for reschedule in reschedules.values() {
1100                    for &(upstream_fragment_id, dispatcher_id) in
1101                        &reschedule.upstream_fragment_dispatcher_ids
1102                    {
1103                        // Find the actors of the upstream fragment.
1104                        let upstream_actor_ids = fragment_actors
1105                            .get(&upstream_fragment_id)
1106                            .expect("should contain");
1107
1108                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1109
1110                        // Record updates for all actors.
1111                        for &actor_id in upstream_actor_ids {
1112                            let added_downstream_actor_id = if upstream_reschedule
1113                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1114                                .unwrap_or(true)
1115                            {
1116                                reschedule
1117                                    .added_actors
1118                                    .values()
1119                                    .flatten()
1120                                    .cloned()
1121                                    .collect()
1122                            } else {
1123                                Default::default()
1124                            };
1125                            // Index with the dispatcher id to check duplicates.
1126                            dispatcher_update
1127                                .try_insert(
1128                                    (actor_id, dispatcher_id),
1129                                    DispatcherUpdate {
1130                                        actor_id,
1131                                        dispatcher_id,
1132                                        hash_mapping: reschedule
1133                                            .upstream_dispatcher_mapping
1134                                            .as_ref()
1135                                            .map(|m| m.to_protobuf()),
1136                                        added_downstream_actor_id,
1137                                        removed_downstream_actor_id: reschedule
1138                                            .removed_actors
1139                                            .iter()
1140                                            .cloned()
1141                                            .collect(),
1142                                    },
1143                                )
1144                                .unwrap();
1145                        }
1146                    }
1147                }
1148                let dispatcher_update = dispatcher_update.into_values().collect();
1149
1150                let mut merge_update = HashMap::new();
1151                for (&fragment_id, reschedule) in reschedules {
1152                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1153                        // Find the actors of the downstream fragment.
1154                        let downstream_actor_ids = fragment_actors
1155                            .get(&downstream_fragment_id)
1156                            .expect("should contain");
1157
1158                        // Downstream removed actors should be skipped
1159                        // Newly created actors of the current fragment will not dispatch Update
1160                        // barriers to them
1161                        let downstream_removed_actors: HashSet<_> = reschedules
1162                            .get(&downstream_fragment_id)
1163                            .map(|downstream_reschedule| {
1164                                downstream_reschedule
1165                                    .removed_actors
1166                                    .iter()
1167                                    .copied()
1168                                    .collect()
1169                            })
1170                            .unwrap_or_default();
1171
1172                        // Record updates for all actors.
1173                        for &actor_id in downstream_actor_ids {
1174                            if downstream_removed_actors.contains(&actor_id) {
1175                                continue;
1176                            }
1177
1178                            // Index with the fragment id to check duplicates.
1179                            merge_update
1180                                .try_insert(
1181                                    (actor_id, fragment_id),
1182                                    MergeUpdate {
1183                                        actor_id,
1184                                        upstream_fragment_id: fragment_id,
1185                                        new_upstream_fragment_id: None,
1186                                        added_upstream_actors: reschedule
1187                                            .added_actors
1188                                            .iter()
1189                                            .flat_map(|(worker_id, actors)| {
1190                                                let host =
1191                                                    control_stream_manager.host_addr(*worker_id);
1192                                                actors.iter().map(move |&actor_id| PbActorInfo {
1193                                                    actor_id,
1194                                                    host: Some(host.clone()),
1195                                                })
1196                                            })
1197                                            .collect(),
1198                                        removed_upstream_actor_id: reschedule
1199                                            .removed_actors
1200                                            .iter()
1201                                            .cloned()
1202                                            .collect(),
1203                                    },
1204                                )
1205                                .unwrap();
1206                        }
1207                    }
1208                }
1209                let merge_update = merge_update.into_values().collect();
1210
1211                let mut actor_vnode_bitmap_update = HashMap::new();
1212                for reschedule in reschedules.values() {
1213                    // Record updates for all actors in this fragment.
1214                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1215                        let bitmap = bitmap.to_protobuf();
1216                        actor_vnode_bitmap_update
1217                            .try_insert(actor_id, bitmap)
1218                            .unwrap();
1219                    }
1220                }
1221                let dropped_actors = reschedules
1222                    .values()
1223                    .flat_map(|r| r.removed_actors.iter().copied())
1224                    .collect();
1225                let mut actor_splits = HashMap::new();
1226                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1227                let mut cdc_table_job_ids: HashSet<_> = HashSet::default();
1228                for reschedule in reschedules.values() {
1229                    for (actor_id, splits) in &reschedule.actor_splits {
1230                        actor_splits.insert(
1231                            *actor_id,
1232                            ConnectorSplits {
1233                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1234                            },
1235                        );
1236                    }
1237                    actor_cdc_table_snapshot_splits.extend(
1238                        build_pb_actor_cdc_table_snapshot_splits(
1239                            reschedule.cdc_table_snapshot_split_assignment.clone(),
1240                        ),
1241                    );
1242                    if let Some(cdc_table_job_id) = reschedule.cdc_table_job_id {
1243                        cdc_table_job_ids.insert(cdc_table_job_id);
1244                    }
1245                }
1246
1247                // we don't create dispatchers in reschedule scenario
1248                let actor_new_dispatchers = HashMap::new();
1249                let actor_cdc_table_snapshot_splits = if actor_cdc_table_snapshot_splits.is_empty()
1250                {
1251                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
1252                        CdcTableSnapshotSplitAssignmentWithGeneration::empty(),
1253                    )
1254                    .into()
1255                } else {
1256                    PbCdcTableSnapshotSplitsWithGeneration {
1257                        splits: actor_cdc_table_snapshot_splits,
1258                        generation: control_stream_manager
1259                            .env
1260                            .cdc_table_backfill_tracker
1261                            .next_generation(cdc_table_job_ids.into_iter()),
1262                    }
1263                    .into()
1264                };
1265                let mutation = Mutation::Update(UpdateMutation {
1266                    dispatcher_update,
1267                    merge_update,
1268                    actor_vnode_bitmap_update,
1269                    dropped_actors,
1270                    actor_splits,
1271                    actor_new_dispatchers,
1272                    actor_cdc_table_snapshot_splits,
1273                    sink_add_columns: Default::default(),
1274                });
1275                tracing::debug!("update mutation: {mutation:?}");
1276                Some(mutation)
1277            }
1278
1279            Command::CreateSubscription {
1280                upstream_mv_table_id,
1281                subscription_id,
1282                ..
1283            } => Some(Mutation::Add(AddMutation {
1284                actor_dispatchers: Default::default(),
1285                added_actors: vec![],
1286                actor_splits: Default::default(),
1287                pause: false,
1288                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1289                    upstream_mv_table_id: *upstream_mv_table_id,
1290                    subscriber_id: subscription_id.as_raw_id(),
1291                }],
1292                backfill_nodes_to_pause: vec![],
1293                actor_cdc_table_snapshot_splits: Default::default(),
1294                new_upstream_sinks: Default::default(),
1295            })),
1296            Command::DropSubscription {
1297                upstream_mv_table_id,
1298                subscription_id,
1299            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1300                info: vec![SubscriptionUpstreamInfo {
1301                    subscriber_id: subscription_id.as_raw_id(),
1302                    upstream_mv_table_id: *upstream_mv_table_id,
1303                }],
1304            })),
1305            Command::ConnectorPropsChange(config) => {
1306                let mut connector_props_infos = HashMap::default();
1307                for (k, v) in config {
1308                    connector_props_infos.insert(
1309                        k.as_raw_id(),
1310                        ConnectorPropsInfo {
1311                            connector_props_info: v.clone(),
1312                        },
1313                    );
1314                }
1315                Some(Mutation::ConnectorPropsChange(
1316                    ConnectorPropsChangeMutation {
1317                        connector_props_infos,
1318                    },
1319                ))
1320            }
1321            Command::StartFragmentBackfill { fragment_ids } => Some(
1322                Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1323                    fragment_ids: fragment_ids.clone(),
1324                }),
1325            ),
1326            Command::Refresh {
1327                table_id,
1328                associated_source_id,
1329            } => Some(Mutation::RefreshStart(
1330                risingwave_pb::stream_plan::RefreshStartMutation {
1331                    table_id: *table_id,
1332                    associated_source_id: *associated_source_id,
1333                },
1334            )),
1335            Command::ListFinish {
1336                table_id: _,
1337                associated_source_id,
1338            } => Some(Mutation::ListFinish(ListFinishMutation {
1339                associated_source_id: *associated_source_id,
1340            })),
1341            Command::LoadFinish {
1342                table_id: _,
1343                associated_source_id,
1344            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1345                associated_source_id: *associated_source_id,
1346            })),
1347        }
1348    }
1349
1350    pub(super) fn actors_to_create(
1351        &self,
1352        graph_info: &InflightDatabaseInfo,
1353        edges: &mut Option<FragmentEdgeBuildResult>,
1354        control_stream_manager: &ControlStreamManager,
1355    ) -> Option<StreamJobActorsToCreate> {
1356        match self {
1357            Command::CreateStreamingJob { info, job_type, .. } => {
1358                if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1359                    // for snapshot backfill, the actors to create is measured separately
1360                    return None;
1361                }
1362                let actors_to_create = info.stream_job_fragments.actors_to_create();
1363                let edges = edges.as_mut().expect("should exist");
1364                Some(edges.collect_actors_to_create(actors_to_create.map(
1365                    |(fragment_id, node, actors)| {
1366                        (
1367                            fragment_id,
1368                            node,
1369                            actors,
1370                            [], // no subscriber for new job to create
1371                        )
1372                    },
1373                )))
1374            }
1375            Command::RescheduleFragment {
1376                reschedules,
1377                fragment_actors,
1378                ..
1379            } => {
1380                let mut actor_upstreams = Self::collect_actor_upstreams(
1381                    reschedules.iter().map(|(fragment_id, reschedule)| {
1382                        (
1383                            *fragment_id,
1384                            reschedule.newly_created_actors.values().map(
1385                                |((actor, dispatchers), _)| {
1386                                    (actor.actor_id, dispatchers.as_slice())
1387                                },
1388                            ),
1389                        )
1390                    }),
1391                    Some((reschedules, fragment_actors)),
1392                    graph_info,
1393                    control_stream_manager,
1394                );
1395                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1396                for (fragment_id, (actor, dispatchers), worker_id) in
1397                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1398                        reschedule
1399                            .newly_created_actors
1400                            .values()
1401                            .map(|(actors, status)| (*fragment_id, actors, status))
1402                    })
1403                {
1404                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1405                    map.entry(*worker_id)
1406                        .or_default()
1407                        .entry(fragment_id)
1408                        .or_insert_with(|| {
1409                            let node = graph_info.fragment(fragment_id).nodes.clone();
1410                            let subscribers =
1411                                graph_info.fragment_subscribers(fragment_id).collect();
1412                            (node, vec![], subscribers)
1413                        })
1414                        .1
1415                        .push((actor.clone(), upstreams, dispatchers.clone()));
1416                }
1417                Some(map)
1418            }
1419            Command::ReplaceStreamJob(replace_table) => {
1420                let edges = edges.as_mut().expect("should exist");
1421                let mut actors = edges.collect_actors_to_create(
1422                    replace_table.new_fragments.actors_to_create().map(
1423                        |(fragment_id, node, actors)| {
1424                            (
1425                                fragment_id,
1426                                node,
1427                                actors,
1428                                graph_info
1429                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1430                            )
1431                        },
1432                    ),
1433                );
1434                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1435                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1436                        (
1437                            sink.new_fragment.fragment_id,
1438                            &sink.new_fragment.nodes,
1439                            sink.new_fragment.actors.iter().map(|actor| {
1440                                (
1441                                    actor,
1442                                    sink.actor_status[&actor.actor_id]
1443                                        .location
1444                                        .as_ref()
1445                                        .unwrap()
1446                                        .worker_node_id,
1447                                )
1448                            }),
1449                            graph_info.job_subscribers(sink.original_sink.id.as_job_id()),
1450                        )
1451                    }));
1452                    for (worker_id, fragment_actors) in sink_actors {
1453                        actors.entry(worker_id).or_default().extend(fragment_actors);
1454                    }
1455                }
1456                Some(actors)
1457            }
1458            _ => None,
1459        }
1460    }
1461
1462    fn generate_update_mutation_for_replace_table(
1463        dropped_actors: impl IntoIterator<Item = ActorId>,
1464        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1465        dispatchers: FragmentActorDispatchers,
1466        init_split_assignment: &SplitAssignment,
1467        cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
1468        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1469    ) -> Option<Mutation> {
1470        let dropped_actors = dropped_actors.into_iter().collect();
1471
1472        let actor_new_dispatchers = dispatchers
1473            .into_values()
1474            .flatten()
1475            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1476            .collect();
1477
1478        let actor_splits = init_split_assignment
1479            .values()
1480            .flat_map(build_actor_connector_splits)
1481            .collect();
1482        Some(Mutation::Update(UpdateMutation {
1483            actor_new_dispatchers,
1484            merge_update: merge_updates.into_values().flatten().collect(),
1485            dropped_actors,
1486            actor_splits,
1487            actor_cdc_table_snapshot_splits:
1488                build_pb_actor_cdc_table_snapshot_splits_with_generation(
1489                    cdc_table_snapshot_split_assignment,
1490                )
1491                .into(),
1492            sink_add_columns: auto_refresh_schema_sinks
1493                .as_ref()
1494                .into_iter()
1495                .flat_map(|sinks| {
1496                    sinks.iter().map(|sink| {
1497                        (
1498                            sink.original_sink.id,
1499                            PbSinkAddColumns {
1500                                fields: sink
1501                                    .newly_add_fields
1502                                    .iter()
1503                                    .map(|field| field.to_prost())
1504                                    .collect(),
1505                            },
1506                        )
1507                    })
1508                })
1509                .collect(),
1510            ..Default::default()
1511        }))
1512    }
1513
1514    /// For `CancelStreamingJob`, returns the table id of the target table.
1515    pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_ {
1516        match self {
1517            Command::DropStreamingJobs {
1518                streaming_job_ids, ..
1519            } => Some(streaming_job_ids.iter().cloned()),
1520            _ => None,
1521        }
1522        .into_iter()
1523        .flatten()
1524    }
1525}
1526
1527impl Command {
1528    #[expect(clippy::type_complexity)]
1529    pub(super) fn collect_actor_upstreams(
1530        actor_dispatchers: impl Iterator<
1531            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1532        >,
1533        reschedule_dispatcher_update: Option<(
1534            &HashMap<FragmentId, Reschedule>,
1535            &HashMap<FragmentId, HashSet<ActorId>>,
1536        )>,
1537        graph_info: &InflightDatabaseInfo,
1538        control_stream_manager: &ControlStreamManager,
1539    ) -> HashMap<ActorId, ActorUpstreams> {
1540        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1541        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1542            let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1543            for (upstream_actor_id, dispatchers) in upstream_actors {
1544                let upstream_actor_location =
1545                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1546                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1547                for downstream_actor_id in dispatchers
1548                    .iter()
1549                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1550                {
1551                    actor_upstreams
1552                        .entry(*downstream_actor_id)
1553                        .or_default()
1554                        .entry(upstream_fragment_id)
1555                        .or_default()
1556                        .insert(
1557                            upstream_actor_id,
1558                            PbActorInfo {
1559                                actor_id: upstream_actor_id,
1560                                host: Some(upstream_actor_host.clone()),
1561                            },
1562                        );
1563                }
1564            }
1565        }
1566        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1567            for reschedule in reschedules.values() {
1568                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1569                    let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1570                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1571                    for upstream_actor_id in fragment_actors
1572                        .get(upstream_fragment_id)
1573                        .expect("should exist")
1574                    {
1575                        let upstream_actor_location =
1576                            upstream_fragment.actors[upstream_actor_id].worker_id;
1577                        let upstream_actor_host =
1578                            control_stream_manager.host_addr(upstream_actor_location);
1579                        if let Some(upstream_reschedule) = upstream_reschedule
1580                            && upstream_reschedule
1581                                .removed_actors
1582                                .contains(upstream_actor_id)
1583                        {
1584                            continue;
1585                        }
1586                        for (_, downstream_actor_id) in
1587                            reschedule
1588                                .added_actors
1589                                .iter()
1590                                .flat_map(|(worker_id, actors)| {
1591                                    actors.iter().map(|actor| (*worker_id, *actor))
1592                                })
1593                        {
1594                            actor_upstreams
1595                                .entry(downstream_actor_id)
1596                                .or_default()
1597                                .entry(*upstream_fragment_id)
1598                                .or_default()
1599                                .insert(
1600                                    *upstream_actor_id,
1601                                    PbActorInfo {
1602                                        actor_id: *upstream_actor_id,
1603                                        host: Some(upstream_actor_host.clone()),
1604                                    },
1605                                );
1606                        }
1607                    }
1608                }
1609            }
1610        }
1611        actor_upstreams
1612    }
1613}