risingwave_meta/barrier/
command.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::ActorMapping;
23use risingwave_common::must_match;
24use risingwave_common::types::Timestamptz;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::SplitImpl;
28use risingwave_connector::source::cdc::{
29    CdcTableSnapshotSplitAssignment, build_pb_actor_cdc_table_snapshot_splits,
30};
31use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
32use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
33use risingwave_meta_model::WorkerId;
34use risingwave_pb::catalog::CreateType;
35use risingwave_pb::common::ActorInfo;
36use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
37use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
38use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
39use risingwave_pb::stream_plan::barrier_mutation::Mutation;
40use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
43use risingwave_pb::stream_plan::update_mutation::*;
44use risingwave_pb::stream_plan::{
45    AddMutation, BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatcher,
46    Dispatchers, DropSubscriptionsMutation, LoadFinishMutation, PauseMutation, ResumeMutation,
47    SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
48    SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
49};
50use risingwave_pb::stream_service::BarrierCompleteResponse;
51use tracing::warn;
52
53use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
54use crate::barrier::InflightSubscriptionInfo;
55use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
56use crate::barrier::edge_builder::FragmentEdgeBuildResult;
57use crate::barrier::info::BarrierInfo;
58use crate::barrier::rpc::ControlStreamManager;
59use crate::barrier::utils::collect_resp_info;
60use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
61use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
62use crate::manager::{StreamingJob, StreamingJobType};
63use crate::model::{
64    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
65    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
66    StreamJobFragments, StreamJobFragmentsToCreate,
67};
68use crate::stream::{
69    AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder,
70    JobReschedulePostUpdates, SplitAssignment, ThrottleConfig, build_actor_connector_splits,
71};
72
73/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
74/// in some fragment, like scaling or migrating.
75#[derive(Debug, Clone)]
76pub struct Reschedule {
77    /// Added actors in this fragment.
78    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
79
80    /// Removed actors in this fragment.
81    pub removed_actors: HashSet<ActorId>,
82
83    /// Vnode bitmap updates for some actors in this fragment.
84    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
85
86    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
87    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
88    /// New hash mapping of the upstream dispatcher to be updated.
89    ///
90    /// This field exists only when there's upstream fragment and the current fragment is
91    /// hash-sharded.
92    pub upstream_dispatcher_mapping: Option<ActorMapping>,
93
94    /// The downstream fragments of this fragment.
95    pub downstream_fragment_ids: Vec<FragmentId>,
96
97    /// Reassigned splits for source actors.
98    /// It becomes the `actor_splits` in [`UpdateMutation`].
99    /// `Source` and `SourceBackfill` are handled together here.
100    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
101
102    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
103
104    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
105}
106
107/// Replacing an old job with a new one. All actors in the job will be rebuilt.
108///
109/// Current use cases:
110/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
111/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
112///   will replace a table job's plan.
113#[derive(Debug, Clone)]
114pub struct ReplaceStreamJobPlan {
115    pub old_fragments: StreamJobFragments,
116    pub new_fragments: StreamJobFragmentsToCreate,
117    /// Downstream jobs of the replaced job need to update their `Merge` node to
118    /// connect to the new fragment.
119    pub replace_upstream: FragmentReplaceUpstream,
120    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
121    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
122    /// We need to reassign splits for it.
123    ///
124    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
125    /// `backfill_splits`.
126    pub init_split_assignment: SplitAssignment,
127    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
128    pub streaming_job: StreamingJob,
129    /// The temporary dummy job fragments id of new table fragment
130    pub tmp_id: u32,
131    /// The state table ids to be dropped.
132    pub to_drop_state_table_ids: Vec<TableId>,
133    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
134    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
135}
136
137impl ReplaceStreamJobPlan {
138    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
139        let mut fragment_changes = HashMap::new();
140        for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
141            let fragment_change = CommandFragmentChanges::NewFragment {
142                job_id: self.streaming_job.id().into(),
143                info: new_fragment,
144                is_existing: false,
145            };
146            fragment_changes
147                .try_insert(fragment_id, fragment_change)
148                .expect("non-duplicate");
149        }
150        for fragment in self.old_fragments.fragments.values() {
151            fragment_changes
152                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
153                .expect("non-duplicate");
154        }
155        for (fragment_id, replace_map) in &self.replace_upstream {
156            fragment_changes
157                .try_insert(
158                    *fragment_id,
159                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
160                )
161                .expect("non-duplicate");
162        }
163        if let Some(sinks) = &self.auto_refresh_schema_sinks {
164            for sink in sinks {
165                let fragment_change = CommandFragmentChanges::NewFragment {
166                    job_id: TableId::new(sink.original_sink.id as _),
167                    info: sink.new_fragment_info(),
168                    is_existing: false,
169                };
170                fragment_changes
171                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
172                    .expect("non-duplicate");
173                fragment_changes
174                    .try_insert(
175                        sink.original_fragment.fragment_id,
176                        CommandFragmentChanges::RemoveFragment,
177                    )
178                    .expect("non-duplicate");
179            }
180        }
181        fragment_changes
182    }
183
184    /// `old_fragment_id` -> `new_fragment_id`
185    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
186        let mut fragment_replacements = HashMap::new();
187        for (upstream_fragment_id, new_upstream_fragment_id) in
188            self.replace_upstream.values().flatten()
189        {
190            {
191                let r =
192                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
193                if let Some(r) = r {
194                    assert_eq!(
195                        *new_upstream_fragment_id, r,
196                        "one fragment is replaced by multiple fragments"
197                    );
198                }
199            }
200        }
201        fragment_replacements
202    }
203}
204
205#[derive(educe::Educe, Clone)]
206#[educe(Debug)]
207pub struct CreateStreamingJobCommandInfo {
208    #[educe(Debug(ignore))]
209    pub stream_job_fragments: StreamJobFragmentsToCreate,
210    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
211    pub init_split_assignment: SplitAssignment,
212    pub definition: String,
213    pub job_type: StreamingJobType,
214    pub create_type: CreateType,
215    pub streaming_job: StreamingJob,
216    pub fragment_backfill_ordering: FragmentBackfillOrder,
217    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
218}
219
220impl StreamJobFragments {
221    pub(super) fn new_fragment_info(
222        &self,
223    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
224        self.fragments.values().map(|fragment| {
225            (
226                fragment.fragment_id,
227                InflightFragmentInfo {
228                    fragment_id: fragment.fragment_id,
229                    distribution_type: fragment.distribution_type.into(),
230                    nodes: fragment.nodes.clone(),
231                    actors: fragment
232                        .actors
233                        .iter()
234                        .map(|actor| {
235                            (
236                                actor.actor_id,
237                                InflightActorInfo {
238                                    worker_id: self
239                                        .actor_status
240                                        .get(&actor.actor_id)
241                                        .expect("should exist")
242                                        .worker_id()
243                                        as WorkerId,
244                                    vnode_bitmap: actor.vnode_bitmap.clone(),
245                                },
246                            )
247                        })
248                        .collect(),
249                    state_table_ids: fragment
250                        .state_table_ids
251                        .iter()
252                        .map(|table_id| TableId::new(*table_id))
253                        .collect(),
254                },
255            )
256        })
257    }
258}
259
260#[derive(Debug, Clone)]
261pub struct SnapshotBackfillInfo {
262    /// `table_id` -> `Some(snapshot_backfill_epoch)`
263    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
264    /// by global barrier worker when handling the command.
265    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
266}
267
268#[derive(Debug, Clone)]
269pub enum CreateStreamingJobType {
270    Normal,
271    SinkIntoTable(ReplaceStreamJobPlan),
272    SnapshotBackfill(SnapshotBackfillInfo),
273}
274
275/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
276/// it will [build different barriers to send](Self::to_mutation),
277/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
278// FIXME: this enum is significantly large on stack, box it
279#[derive(Debug)]
280pub enum Command {
281    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
282    /// all messages before the checkpoint barrier should have been committed.
283    Flush,
284
285    /// `Pause` command generates a `Pause` barrier **only if**
286    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
287    Pause,
288
289    /// `Resume` command generates a `Resume` barrier **only
290    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
291    /// will be generated.
292    Resume,
293
294    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
295    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
296    /// dropped by reference counts before.
297    ///
298    /// Barriers from the actors to be dropped will STILL be collected.
299    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
300    /// drop actors, and then delete the job fragments info from meta store.
301    DropStreamingJobs {
302        table_fragments_ids: HashSet<TableId>,
303        actors: Vec<ActorId>,
304        unregistered_state_table_ids: HashSet<TableId>,
305        unregistered_fragment_ids: HashSet<FragmentId>,
306    },
307
308    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
309    ///
310    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
311    /// be collected since the barrier should be passthrough.
312    ///
313    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
314    /// it adds the job fragments info to meta store. However, the creating progress will **last
315    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
316    /// will be set to `Created`.
317    CreateStreamingJob {
318        info: CreateStreamingJobCommandInfo,
319        job_type: CreateStreamingJobType,
320        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
321    },
322    MergeSnapshotBackfillStreamingJobs(
323        HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
324    ),
325
326    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
327    /// Mainly used for scaling and migration.
328    ///
329    /// Barriers from which actors should be collected, and the post behavior of this command are
330    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
331    RescheduleFragment {
332        reschedules: HashMap<FragmentId, Reschedule>,
333        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
334        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
335        // Used for updating additional metadata after the barrier ends
336        post_updates: JobReschedulePostUpdates,
337    },
338
339    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
340    /// essentially switching the downstream of the old job fragments to the new ones, and
341    /// dropping the old job fragments. Used for schema change.
342    ///
343    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
344    /// of the Merge executors are changed additionally.
345    ReplaceStreamJob(ReplaceStreamJobPlan),
346
347    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
348    /// changed splits.
349    SourceChangeSplit(SplitAssignment),
350
351    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
352    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
353    Throttle(ThrottleConfig),
354
355    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
356    /// materialize executor to start storing old value for subscription.
357    CreateSubscription {
358        subscription_id: u32,
359        upstream_mv_table_id: TableId,
360        retention_second: u64,
361    },
362
363    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
364    /// materialize executor to stop storing old value when there is no
365    /// subscription depending on it.
366    DropSubscription {
367        subscription_id: u32,
368        upstream_mv_table_id: TableId,
369    },
370
371    ConnectorPropsChange(ConnectorPropsChange),
372
373    /// `StartFragmentBackfill` command will trigger backfilling for specified scans by `fragment_id`.
374    StartFragmentBackfill {
375        fragment_ids: Vec<FragmentId>,
376    },
377
378    /// `Refresh` command generates a barrier to refresh a table by truncating state
379    /// and reloading data from source.
380    Refresh {
381        table_id: TableId,
382        associated_source_id: TableId,
383    },
384    LoadFinish {
385        table_id: TableId,
386        associated_source_id: TableId,
387    },
388}
389
390// For debugging and observability purposes. Can add more details later if needed.
391impl std::fmt::Display for Command {
392    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
393        match self {
394            Command::Flush => write!(f, "Flush"),
395            Command::Pause => write!(f, "Pause"),
396            Command::Resume => write!(f, "Resume"),
397            Command::DropStreamingJobs {
398                table_fragments_ids,
399                ..
400            } => {
401                write!(
402                    f,
403                    "DropStreamingJobs: {}",
404                    table_fragments_ids.iter().sorted().join(", ")
405                )
406            }
407            Command::CreateStreamingJob { info, .. } => {
408                write!(f, "CreateStreamingJob: {}", info.streaming_job)
409            }
410            Command::MergeSnapshotBackfillStreamingJobs(_) => {
411                write!(f, "MergeSnapshotBackfillStreamingJobs")
412            }
413            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
414            Command::ReplaceStreamJob(plan) => {
415                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
416            }
417            Command::SourceChangeSplit(_) => write!(f, "SourceChangeSplit"),
418            Command::Throttle(_) => write!(f, "Throttle"),
419            Command::CreateSubscription {
420                subscription_id, ..
421            } => write!(f, "CreateSubscription: {subscription_id}"),
422            Command::DropSubscription {
423                subscription_id, ..
424            } => write!(f, "DropSubscription: {subscription_id}"),
425            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
426            Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
427            Command::Refresh {
428                table_id,
429                associated_source_id,
430            } => write!(
431                f,
432                "Refresh: {} (source: {})",
433                table_id, associated_source_id
434            ),
435            Command::LoadFinish {
436                table_id,
437                associated_source_id,
438            } => write!(
439                f,
440                "LoadFinish: {} (source: {})",
441                table_id, associated_source_id
442            ),
443        }
444    }
445}
446
447impl Command {
448    pub fn pause() -> Self {
449        Self::Pause
450    }
451
452    pub fn resume() -> Self {
453        Self::Resume
454    }
455
456    pub fn cancel(table_fragments: &StreamJobFragments) -> Self {
457        Self::DropStreamingJobs {
458            table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
459            actors: table_fragments.actor_ids(),
460            unregistered_state_table_ids: table_fragments
461                .all_table_ids()
462                .map(TableId::new)
463                .collect(),
464            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
465        }
466    }
467
468    pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
469        match self {
470            Command::Flush => None,
471            Command::Pause => None,
472            Command::Resume => None,
473            Command::DropStreamingJobs {
474                unregistered_fragment_ids,
475                ..
476            } => Some(
477                unregistered_fragment_ids
478                    .iter()
479                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
480                    .collect(),
481            ),
482            Command::CreateStreamingJob { info, job_type, .. } => {
483                assert!(
484                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
485                    "should handle fragment changes separately for snapshot backfill"
486                );
487                let mut changes: HashMap<_, _> = info
488                    .stream_job_fragments
489                    .new_fragment_info()
490                    .map(|(fragment_id, fragment_info)| {
491                        (
492                            fragment_id,
493                            CommandFragmentChanges::NewFragment {
494                                job_id: info.streaming_job.id().into(),
495                                info: fragment_info,
496                                is_existing: false,
497                            },
498                        )
499                    })
500                    .collect();
501
502                if let CreateStreamingJobType::SinkIntoTable(plan) = job_type {
503                    let extra_change = plan.fragment_changes();
504                    changes.extend(extra_change);
505                }
506
507                Some(changes)
508            }
509            Command::RescheduleFragment { reschedules, .. } => Some(
510                reschedules
511                    .iter()
512                    .map(|(fragment_id, reschedule)| {
513                        (
514                            *fragment_id,
515                            CommandFragmentChanges::Reschedule {
516                                new_actors: reschedule
517                                    .added_actors
518                                    .iter()
519                                    .flat_map(|(node_id, actors)| {
520                                        actors.iter().map(|actor_id| {
521                                            (
522                                                *actor_id,
523                                                InflightActorInfo {
524                                                    worker_id: *node_id,
525                                                    vnode_bitmap: reschedule
526                                                        .newly_created_actors
527                                                        .get(actor_id)
528                                                        .expect("should exist")
529                                                        .0
530                                                        .0
531                                                        .vnode_bitmap
532                                                        .clone(),
533                                                },
534                                            )
535                                        })
536                                    })
537                                    .collect(),
538                                actor_update_vnode_bitmap: reschedule
539                                    .vnode_bitmap_updates
540                                    .iter()
541                                    .filter(|(actor_id, _)| {
542                                        // only keep the existing actors
543                                        !reschedule.newly_created_actors.contains_key(actor_id)
544                                    })
545                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
546                                    .collect(),
547                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
548                            },
549                        )
550                    })
551                    .collect(),
552            ),
553            Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
554            Command::MergeSnapshotBackfillStreamingJobs(_) => None,
555            Command::SourceChangeSplit(_) => None,
556            Command::Throttle(_) => None,
557            Command::CreateSubscription { .. } => None,
558            Command::DropSubscription { .. } => None,
559            Command::ConnectorPropsChange(_) => None,
560            Command::StartFragmentBackfill { .. } => None,
561            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
562            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
563        }
564    }
565
566    pub fn need_checkpoint(&self) -> bool {
567        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
568        !matches!(self, Command::Resume)
569    }
570}
571
572#[derive(Debug, Clone)]
573pub enum BarrierKind {
574    Initial,
575    Barrier,
576    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
577    Checkpoint(Vec<u64>),
578}
579
580impl BarrierKind {
581    pub fn to_protobuf(&self) -> PbBarrierKind {
582        match self {
583            BarrierKind::Initial => PbBarrierKind::Initial,
584            BarrierKind::Barrier => PbBarrierKind::Barrier,
585            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
586        }
587    }
588
589    pub fn is_checkpoint(&self) -> bool {
590        matches!(self, BarrierKind::Checkpoint(_))
591    }
592
593    pub fn is_initial(&self) -> bool {
594        matches!(self, BarrierKind::Initial)
595    }
596
597    pub fn as_str_name(&self) -> &'static str {
598        match self {
599            BarrierKind::Initial => "Initial",
600            BarrierKind::Barrier => "Barrier",
601            BarrierKind::Checkpoint(_) => "Checkpoint",
602        }
603    }
604}
605
606/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
607/// [`Command`].
608pub(super) struct CommandContext {
609    subscription_info: InflightSubscriptionInfo,
610
611    pub(super) barrier_info: BarrierInfo,
612
613    pub(super) table_ids_to_commit: HashSet<TableId>,
614
615    pub(super) command: Option<Command>,
616
617    /// The tracing span of this command.
618    ///
619    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
620    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
621    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
622    _span: tracing::Span,
623}
624
625impl std::fmt::Debug for CommandContext {
626    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
627        f.debug_struct("CommandContext")
628            .field("barrier_info", &self.barrier_info)
629            .field("command", &self.command)
630            .finish()
631    }
632}
633
634impl std::fmt::Display for CommandContext {
635    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
636        write!(
637            f,
638            "prev_epoch={}, curr_epoch={}, kind={}",
639            self.barrier_info.prev_epoch.value().0,
640            self.barrier_info.curr_epoch.value().0,
641            self.barrier_info.kind.as_str_name()
642        )?;
643        if let Some(command) = &self.command {
644            write!(f, ", command={}", command)?;
645        }
646        Ok(())
647    }
648}
649
650impl CommandContext {
651    pub(super) fn new(
652        barrier_info: BarrierInfo,
653        subscription_info: InflightSubscriptionInfo,
654        table_ids_to_commit: HashSet<TableId>,
655        command: Option<Command>,
656        span: tracing::Span,
657    ) -> Self {
658        Self {
659            subscription_info,
660            barrier_info,
661            table_ids_to_commit,
662            command,
663            _span: span,
664        }
665    }
666
667    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
668        let Some(truncate_timestamptz) = Timestamptz::from_secs(
669            self.barrier_info
670                .prev_epoch
671                .value()
672                .as_timestamptz()
673                .timestamp()
674                - retention_second as i64,
675        ) else {
676            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
677            return self.barrier_info.prev_epoch.value();
678        };
679        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
680    }
681
682    pub(super) fn collect_commit_epoch_info(
683        &self,
684        info: &mut CommitEpochInfo,
685        resps: Vec<BarrierCompleteResponse>,
686        backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
687    ) {
688        let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts, vector_index_adds) =
689            collect_resp_info(resps);
690
691        let new_table_fragment_infos =
692            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
693                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
694            {
695                let table_fragments = &info.stream_job_fragments;
696                let mut table_ids: HashSet<_> = table_fragments
697                    .internal_table_ids()
698                    .into_iter()
699                    .map(TableId::new)
700                    .collect();
701                if let Some(mv_table_id) = table_fragments.mv_table_id() {
702                    table_ids.insert(TableId::new(mv_table_id));
703                }
704
705                vec![NewTableFragmentInfo { table_ids }]
706            } else {
707                vec![]
708            };
709
710        let mut mv_log_store_truncate_epoch = HashMap::new();
711        // TODO: may collect cross db snapshot backfill
712        let mut update_truncate_epoch =
713            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
714                .entry(table_id.table_id)
715            {
716                Entry::Occupied(mut entry) => {
717                    let prev_truncate_epoch = entry.get_mut();
718                    if truncate_epoch < *prev_truncate_epoch {
719                        *prev_truncate_epoch = truncate_epoch;
720                    }
721                }
722                Entry::Vacant(entry) => {
723                    entry.insert(truncate_epoch);
724                }
725            };
726        for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
727            if let Some(truncate_epoch) = subscriptions
728                .values()
729                .max()
730                .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
731            {
732                update_truncate_epoch(*mv_table_id, truncate_epoch);
733            }
734        }
735        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
736            for mv_table_id in upstream_mv_table_ids {
737                update_truncate_epoch(mv_table_id, backfill_epoch);
738            }
739        }
740
741        let table_new_change_log = build_table_change_log_delta(
742            old_value_ssts.into_iter(),
743            synced_ssts.iter().map(|sst| &sst.sst_info),
744            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
745            mv_log_store_truncate_epoch.into_iter(),
746        );
747
748        let epoch = self.barrier_info.prev_epoch();
749        for table_id in &self.table_ids_to_commit {
750            info.tables_to_commit
751                .try_insert(*table_id, epoch)
752                .expect("non duplicate");
753        }
754
755        info.sstables.extend(synced_ssts);
756        info.new_table_watermarks.extend(new_table_watermarks);
757        info.sst_to_context.extend(sst_to_context);
758        info.new_table_fragment_infos
759            .extend(new_table_fragment_infos);
760        info.change_log_delta.extend(table_new_change_log);
761        for (table_id, vector_index_adds) in vector_index_adds {
762            info.vector_index_delta
763                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
764                .expect("non-duplicate");
765        }
766        if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
767            for fragment in job_info.stream_job_fragments.fragments.values() {
768                visit_stream_node_cont(&fragment.nodes, |node| {
769                    match node.node_body.as_ref().unwrap() {
770                        NodeBody::VectorIndexWrite(vector_index_write) => {
771                            let index_table = vector_index_write.table.as_ref().unwrap();
772                            info.vector_index_delta
773                                .try_insert(
774                                    index_table.id.into(),
775                                    VectorIndexDelta::Init(PbVectorIndexInit {
776                                        info: Some(index_table.vector_index_info.unwrap()),
777                                    }),
778                                )
779                                .expect("non-duplicate");
780                            false
781                        }
782                        _ => true,
783                    }
784                })
785            }
786        }
787    }
788}
789
790impl Command {
791    /// Generate a mutation for the given command.
792    ///
793    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
794    pub(super) fn to_mutation(
795        &self,
796        is_currently_paused: bool,
797        edges: &mut Option<FragmentEdgeBuildResult>,
798        control_stream_manager: &ControlStreamManager,
799    ) -> Option<Mutation> {
800        match self {
801            Command::Flush => None,
802
803            Command::Pause => {
804                // Only pause when the cluster is not already paused.
805                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
806                if !is_currently_paused {
807                    Some(Mutation::Pause(PauseMutation {}))
808                } else {
809                    None
810                }
811            }
812
813            Command::Resume => {
814                // Only resume when the cluster is paused with the same reason.
815                if is_currently_paused {
816                    Some(Mutation::Resume(ResumeMutation {}))
817                } else {
818                    None
819                }
820            }
821
822            Command::SourceChangeSplit(change) => {
823                let mut diff = HashMap::new();
824
825                for actor_splits in change.values() {
826                    diff.extend(actor_splits.clone());
827                }
828
829                Some(Mutation::Splits(SourceChangeSplitMutation {
830                    actor_splits: build_actor_connector_splits(&diff),
831                }))
832            }
833
834            Command::Throttle(config) => {
835                let mut actor_to_apply = HashMap::new();
836                for per_fragment in config.values() {
837                    actor_to_apply.extend(
838                        per_fragment
839                            .iter()
840                            .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
841                    );
842                }
843
844                Some(Mutation::Throttle(ThrottleMutation {
845                    actor_throttle: actor_to_apply,
846                }))
847            }
848
849            Command::DropStreamingJobs { actors, .. } => Some(Mutation::Stop(StopMutation {
850                actors: actors.clone(),
851            })),
852
853            Command::CreateStreamingJob {
854                info:
855                    CreateStreamingJobCommandInfo {
856                        stream_job_fragments: table_fragments,
857                        init_split_assignment: split_assignment,
858                        upstream_fragment_downstreams,
859                        fragment_backfill_ordering,
860                        cdc_table_snapshot_split_assignment,
861                        ..
862                    },
863                job_type,
864                ..
865            } => {
866                let edges = edges.as_mut().expect("should exist");
867                let added_actors = table_fragments.actor_ids();
868                let actor_splits = split_assignment
869                    .values()
870                    .flat_map(build_actor_connector_splits)
871                    .collect();
872                let subscriptions_to_add =
873                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
874                        job_type
875                    {
876                        snapshot_backfill_info
877                            .upstream_mv_table_id_to_backfill_epoch
878                            .keys()
879                            .map(|table_id| SubscriptionUpstreamInfo {
880                                subscriber_id: table_fragments.stream_job_id().table_id,
881                                upstream_mv_table_id: table_id.table_id,
882                            })
883                            .collect()
884                    } else {
885                        Default::default()
886                    };
887                let backfill_nodes_to_pause: Vec<_> =
888                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
889                        .into_iter()
890                        .collect();
891                let add = Some(Mutation::Add(AddMutation {
892                    actor_dispatchers: edges
893                        .dispatchers
894                        .extract_if(|fragment_id, _| {
895                            upstream_fragment_downstreams.contains_key(fragment_id)
896                        })
897                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
898                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
899                        .collect(),
900                    added_actors,
901                    actor_splits,
902                    // If the cluster is already paused, the new actors should be paused too.
903                    pause: is_currently_paused,
904                    subscriptions_to_add,
905                    backfill_nodes_to_pause,
906                    actor_cdc_table_snapshot_splits: build_pb_actor_cdc_table_snapshot_splits(
907                        cdc_table_snapshot_split_assignment.clone(),
908                    ),
909                }));
910
911                if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
912                    old_fragments,
913                    init_split_assignment,
914                    replace_upstream,
915                    upstream_fragment_downstreams,
916                    cdc_table_snapshot_split_assignment,
917                    ..
918                }) = job_type
919                {
920                    let merge_updates = edges
921                        .merge_updates
922                        .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
923                        .collect();
924                    let dispatchers = edges
925                        .dispatchers
926                        .extract_if(|fragment_id, _| {
927                            upstream_fragment_downstreams.contains_key(fragment_id)
928                        })
929                        .collect();
930                    let update = Self::generate_update_mutation_for_replace_table(
931                        old_fragments.actor_ids(),
932                        merge_updates,
933                        dispatchers,
934                        init_split_assignment,
935                        cdc_table_snapshot_split_assignment,
936                    );
937
938                    Some(Mutation::Combined(CombinedMutation {
939                        mutations: vec![
940                            BarrierMutation { mutation: add },
941                            BarrierMutation { mutation: update },
942                        ],
943                    }))
944                } else {
945                    add
946                }
947            }
948            Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
949                Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
950                    info: jobs_to_merge
951                        .iter()
952                        .flat_map(|(table_id, (backfill_upstream_tables, _))| {
953                            backfill_upstream_tables
954                                .iter()
955                                .map(move |upstream_table_id| SubscriptionUpstreamInfo {
956                                    subscriber_id: table_id.table_id,
957                                    upstream_mv_table_id: upstream_table_id.table_id,
958                                })
959                        })
960                        .collect(),
961                }))
962            }
963
964            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
965                old_fragments,
966                replace_upstream,
967                upstream_fragment_downstreams,
968                init_split_assignment,
969                auto_refresh_schema_sinks,
970                cdc_table_snapshot_split_assignment,
971                ..
972            }) => {
973                let edges = edges.as_mut().expect("should exist");
974                let merge_updates = edges
975                    .merge_updates
976                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
977                    .collect();
978                let dispatchers = edges
979                    .dispatchers
980                    .extract_if(|fragment_id, _| {
981                        upstream_fragment_downstreams.contains_key(fragment_id)
982                    })
983                    .collect();
984                Self::generate_update_mutation_for_replace_table(
985                    old_fragments.actor_ids().into_iter().chain(
986                        auto_refresh_schema_sinks
987                            .as_ref()
988                            .into_iter()
989                            .flat_map(|sinks| {
990                                sinks.iter().flat_map(|sink| {
991                                    sink.original_fragment
992                                        .actors
993                                        .iter()
994                                        .map(|actor| actor.actor_id)
995                                })
996                            }),
997                    ),
998                    merge_updates,
999                    dispatchers,
1000                    init_split_assignment,
1001                    cdc_table_snapshot_split_assignment,
1002                )
1003            }
1004
1005            Command::RescheduleFragment {
1006                reschedules,
1007                fragment_actors,
1008                ..
1009            } => {
1010                let mut dispatcher_update = HashMap::new();
1011                for reschedule in reschedules.values() {
1012                    for &(upstream_fragment_id, dispatcher_id) in
1013                        &reschedule.upstream_fragment_dispatcher_ids
1014                    {
1015                        // Find the actors of the upstream fragment.
1016                        let upstream_actor_ids = fragment_actors
1017                            .get(&upstream_fragment_id)
1018                            .expect("should contain");
1019
1020                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1021
1022                        // Record updates for all actors.
1023                        for &actor_id in upstream_actor_ids {
1024                            let added_downstream_actor_id = if upstream_reschedule
1025                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1026                                .unwrap_or(true)
1027                            {
1028                                reschedule
1029                                    .added_actors
1030                                    .values()
1031                                    .flatten()
1032                                    .cloned()
1033                                    .collect()
1034                            } else {
1035                                Default::default()
1036                            };
1037                            // Index with the dispatcher id to check duplicates.
1038                            dispatcher_update
1039                                .try_insert(
1040                                    (actor_id, dispatcher_id),
1041                                    DispatcherUpdate {
1042                                        actor_id,
1043                                        dispatcher_id,
1044                                        hash_mapping: reschedule
1045                                            .upstream_dispatcher_mapping
1046                                            .as_ref()
1047                                            .map(|m| m.to_protobuf()),
1048                                        added_downstream_actor_id,
1049                                        removed_downstream_actor_id: reschedule
1050                                            .removed_actors
1051                                            .iter()
1052                                            .cloned()
1053                                            .collect(),
1054                                    },
1055                                )
1056                                .unwrap();
1057                        }
1058                    }
1059                }
1060                let dispatcher_update = dispatcher_update.into_values().collect();
1061
1062                let mut merge_update = HashMap::new();
1063                for (&fragment_id, reschedule) in reschedules {
1064                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1065                        // Find the actors of the downstream fragment.
1066                        let downstream_actor_ids = fragment_actors
1067                            .get(&downstream_fragment_id)
1068                            .expect("should contain");
1069
1070                        // Downstream removed actors should be skipped
1071                        // Newly created actors of the current fragment will not dispatch Update
1072                        // barriers to them
1073                        let downstream_removed_actors: HashSet<_> = reschedules
1074                            .get(&downstream_fragment_id)
1075                            .map(|downstream_reschedule| {
1076                                downstream_reschedule
1077                                    .removed_actors
1078                                    .iter()
1079                                    .copied()
1080                                    .collect()
1081                            })
1082                            .unwrap_or_default();
1083
1084                        // Record updates for all actors.
1085                        for &actor_id in downstream_actor_ids {
1086                            if downstream_removed_actors.contains(&actor_id) {
1087                                continue;
1088                            }
1089
1090                            // Index with the fragment id to check duplicates.
1091                            merge_update
1092                                .try_insert(
1093                                    (actor_id, fragment_id),
1094                                    MergeUpdate {
1095                                        actor_id,
1096                                        upstream_fragment_id: fragment_id,
1097                                        new_upstream_fragment_id: None,
1098                                        added_upstream_actors: reschedule
1099                                            .added_actors
1100                                            .iter()
1101                                            .flat_map(|(worker_id, actors)| {
1102                                                let host =
1103                                                    control_stream_manager.host_addr(*worker_id);
1104                                                actors.iter().map(move |actor_id| ActorInfo {
1105                                                    actor_id: *actor_id,
1106                                                    host: Some(host.clone()),
1107                                                })
1108                                            })
1109                                            .collect(),
1110                                        removed_upstream_actor_id: reschedule
1111                                            .removed_actors
1112                                            .iter()
1113                                            .cloned()
1114                                            .collect(),
1115                                    },
1116                                )
1117                                .unwrap();
1118                        }
1119                    }
1120                }
1121                let merge_update = merge_update.into_values().collect();
1122
1123                let mut actor_vnode_bitmap_update = HashMap::new();
1124                for reschedule in reschedules.values() {
1125                    // Record updates for all actors in this fragment.
1126                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1127                        let bitmap = bitmap.to_protobuf();
1128                        actor_vnode_bitmap_update
1129                            .try_insert(actor_id, bitmap)
1130                            .unwrap();
1131                    }
1132                }
1133                let dropped_actors = reschedules
1134                    .values()
1135                    .flat_map(|r| r.removed_actors.iter().copied())
1136                    .collect();
1137                let mut actor_splits = HashMap::new();
1138                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1139
1140                for reschedule in reschedules.values() {
1141                    for (actor_id, splits) in &reschedule.actor_splits {
1142                        actor_splits.insert(
1143                            *actor_id as ActorId,
1144                            ConnectorSplits {
1145                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1146                            },
1147                        );
1148                    }
1149                    actor_cdc_table_snapshot_splits.extend(
1150                        build_pb_actor_cdc_table_snapshot_splits(
1151                            reschedule.cdc_table_snapshot_split_assignment.clone(),
1152                        ),
1153                    );
1154                }
1155
1156                // we don't create dispatchers in reschedule scenario
1157                let actor_new_dispatchers = HashMap::new();
1158
1159                let mutation = Mutation::Update(UpdateMutation {
1160                    dispatcher_update,
1161                    merge_update,
1162                    actor_vnode_bitmap_update,
1163                    dropped_actors,
1164                    actor_splits,
1165                    actor_new_dispatchers,
1166                    actor_cdc_table_snapshot_splits,
1167                });
1168                tracing::debug!("update mutation: {mutation:?}");
1169                Some(mutation)
1170            }
1171
1172            Command::CreateSubscription {
1173                upstream_mv_table_id,
1174                subscription_id,
1175                ..
1176            } => Some(Mutation::Add(AddMutation {
1177                actor_dispatchers: Default::default(),
1178                added_actors: vec![],
1179                actor_splits: Default::default(),
1180                pause: false,
1181                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1182                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1183                    subscriber_id: *subscription_id,
1184                }],
1185                backfill_nodes_to_pause: vec![],
1186                actor_cdc_table_snapshot_splits: Default::default(),
1187            })),
1188            Command::DropSubscription {
1189                upstream_mv_table_id,
1190                subscription_id,
1191            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1192                info: vec![SubscriptionUpstreamInfo {
1193                    subscriber_id: *subscription_id,
1194                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1195                }],
1196            })),
1197            Command::ConnectorPropsChange(config) => {
1198                let mut connector_props_infos = HashMap::default();
1199                for (k, v) in config {
1200                    connector_props_infos.insert(
1201                        *k,
1202                        ConnectorPropsInfo {
1203                            connector_props_info: v.clone(),
1204                        },
1205                    );
1206                }
1207                Some(Mutation::ConnectorPropsChange(
1208                    ConnectorPropsChangeMutation {
1209                        connector_props_infos,
1210                    },
1211                ))
1212            }
1213            Command::StartFragmentBackfill { fragment_ids } => Some(
1214                Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1215                    fragment_ids: fragment_ids.clone(),
1216                }),
1217            ),
1218            Command::Refresh {
1219                table_id,
1220                associated_source_id,
1221            } => Some(Mutation::RefreshStart(
1222                risingwave_pb::stream_plan::RefreshStartMutation {
1223                    table_id: table_id.table_id,
1224                    associated_source_id: associated_source_id.table_id,
1225                },
1226            )),
1227            Command::LoadFinish {
1228                table_id: _,
1229                associated_source_id,
1230            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1231                associated_source_id: associated_source_id.table_id,
1232            })),
1233        }
1234    }
1235
1236    pub(super) fn actors_to_create(
1237        &self,
1238        graph_info: &InflightDatabaseInfo,
1239        edges: &mut Option<FragmentEdgeBuildResult>,
1240        control_stream_manager: &ControlStreamManager,
1241    ) -> Option<StreamJobActorsToCreate> {
1242        match self {
1243            Command::CreateStreamingJob { info, job_type, .. } => {
1244                let sink_into_table_replace_plan = match job_type {
1245                    CreateStreamingJobType::Normal => None,
1246                    CreateStreamingJobType::SinkIntoTable(replace_table) => Some(replace_table),
1247                    CreateStreamingJobType::SnapshotBackfill(_) => {
1248                        // for snapshot backfill, the actors to create is measured separately
1249                        return None;
1250                    }
1251                };
1252                let get_actors_to_create = || {
1253                    sink_into_table_replace_plan
1254                        .map(|plan| plan.new_fragments.actors_to_create())
1255                        .into_iter()
1256                        .flatten()
1257                        .chain(info.stream_job_fragments.actors_to_create())
1258                };
1259                let edges = edges.as_mut().expect("should exist");
1260                Some(edges.collect_actors_to_create(get_actors_to_create()))
1261            }
1262            Command::RescheduleFragment {
1263                reschedules,
1264                fragment_actors,
1265                ..
1266            } => {
1267                let mut actor_upstreams = Self::collect_actor_upstreams(
1268                    reschedules.iter().map(|(fragment_id, reschedule)| {
1269                        (
1270                            *fragment_id,
1271                            reschedule.newly_created_actors.values().map(
1272                                |((actor, dispatchers), _)| {
1273                                    (actor.actor_id, dispatchers.as_slice())
1274                                },
1275                            ),
1276                        )
1277                    }),
1278                    Some((reschedules, fragment_actors)),
1279                    graph_info,
1280                    control_stream_manager,
1281                );
1282                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1283                for (fragment_id, (actor, dispatchers), worker_id) in
1284                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1285                        reschedule
1286                            .newly_created_actors
1287                            .values()
1288                            .map(|(actors, status)| (*fragment_id, actors, status))
1289                    })
1290                {
1291                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1292                    map.entry(*worker_id)
1293                        .or_default()
1294                        .entry(fragment_id)
1295                        .or_insert_with(|| {
1296                            let node = graph_info.fragment(fragment_id).nodes.clone();
1297                            (node, vec![])
1298                        })
1299                        .1
1300                        .push((actor.clone(), upstreams, dispatchers.clone()));
1301                }
1302                Some(map)
1303            }
1304            Command::ReplaceStreamJob(replace_table) => {
1305                let edges = edges.as_mut().expect("should exist");
1306                let mut actors =
1307                    edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create());
1308                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1309                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1310                        (
1311                            sink.new_fragment.fragment_id,
1312                            &sink.new_fragment.nodes,
1313                            sink.new_fragment.actors.iter().map(|actor| {
1314                                (
1315                                    actor,
1316                                    sink.actor_status[&actor.actor_id]
1317                                        .location
1318                                        .as_ref()
1319                                        .unwrap()
1320                                        .worker_node_id as _,
1321                                )
1322                            }),
1323                        )
1324                    }));
1325                    for (worker_id, fragment_actors) in sink_actors {
1326                        actors.entry(worker_id).or_default().extend(fragment_actors);
1327                    }
1328                }
1329                Some(actors)
1330            }
1331            _ => None,
1332        }
1333    }
1334
1335    fn generate_update_mutation_for_replace_table(
1336        dropped_actors: impl IntoIterator<Item = ActorId>,
1337        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1338        dispatchers: FragmentActorDispatchers,
1339        init_split_assignment: &SplitAssignment,
1340        cdc_table_snapshot_split_assignment: &CdcTableSnapshotSplitAssignment,
1341    ) -> Option<Mutation> {
1342        let dropped_actors = dropped_actors.into_iter().collect();
1343
1344        let actor_new_dispatchers = dispatchers
1345            .into_values()
1346            .flatten()
1347            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1348            .collect();
1349
1350        let actor_splits = init_split_assignment
1351            .values()
1352            .flat_map(build_actor_connector_splits)
1353            .collect();
1354
1355        Some(Mutation::Update(UpdateMutation {
1356            actor_new_dispatchers,
1357            merge_update: merge_updates.into_values().flatten().collect(),
1358            dropped_actors,
1359            actor_splits,
1360            actor_cdc_table_snapshot_splits: build_pb_actor_cdc_table_snapshot_splits(
1361                cdc_table_snapshot_split_assignment.clone(),
1362            ),
1363            ..Default::default()
1364        }))
1365    }
1366
1367    /// For `CancelStreamingJob`, returns the table id of the target table.
1368    pub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1369        match self {
1370            Command::DropStreamingJobs {
1371                table_fragments_ids,
1372                ..
1373            } => Some(table_fragments_ids.iter().cloned()),
1374            _ => None,
1375        }
1376        .into_iter()
1377        .flatten()
1378    }
1379}
1380
1381impl Command {
1382    #[expect(clippy::type_complexity)]
1383    pub(super) fn collect_actor_upstreams(
1384        actor_dispatchers: impl Iterator<
1385            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1386        >,
1387        reschedule_dispatcher_update: Option<(
1388            &HashMap<FragmentId, Reschedule>,
1389            &HashMap<FragmentId, HashSet<ActorId>>,
1390        )>,
1391        graph_info: &InflightDatabaseInfo,
1392        control_stream_manager: &ControlStreamManager,
1393    ) -> HashMap<ActorId, ActorUpstreams> {
1394        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1395        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1396            let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1397            for (upstream_actor_id, dispatchers) in upstream_actors {
1398                let upstream_actor_location =
1399                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1400                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1401                for downstream_actor_id in dispatchers
1402                    .iter()
1403                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1404                {
1405                    actor_upstreams
1406                        .entry(*downstream_actor_id)
1407                        .or_default()
1408                        .entry(upstream_fragment_id)
1409                        .or_default()
1410                        .insert(
1411                            upstream_actor_id,
1412                            ActorInfo {
1413                                actor_id: upstream_actor_id,
1414                                host: Some(upstream_actor_host.clone()),
1415                            },
1416                        );
1417                }
1418            }
1419        }
1420        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1421            for reschedule in reschedules.values() {
1422                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1423                    let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1424                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1425                    for upstream_actor_id in fragment_actors
1426                        .get(upstream_fragment_id)
1427                        .expect("should exist")
1428                    {
1429                        let upstream_actor_location =
1430                            upstream_fragment.actors[upstream_actor_id].worker_id;
1431                        let upstream_actor_host =
1432                            control_stream_manager.host_addr(upstream_actor_location);
1433                        if let Some(upstream_reschedule) = upstream_reschedule
1434                            && upstream_reschedule
1435                                .removed_actors
1436                                .contains(upstream_actor_id)
1437                        {
1438                            continue;
1439                        }
1440                        for (_, downstream_actor_id) in
1441                            reschedule
1442                                .added_actors
1443                                .iter()
1444                                .flat_map(|(worker_id, actors)| {
1445                                    actors.iter().map(|actor| (*worker_id, *actor))
1446                                })
1447                        {
1448                            actor_upstreams
1449                                .entry(downstream_actor_id)
1450                                .or_default()
1451                                .entry(*upstream_fragment_id)
1452                                .or_default()
1453                                .insert(
1454                                    *upstream_actor_id,
1455                                    ActorInfo {
1456                                        actor_id: *upstream_actor_id,
1457                                        host: Some(upstream_actor_host.clone()),
1458                                    },
1459                                );
1460                        }
1461                    }
1462                }
1463            }
1464        }
1465        actor_upstreams
1466    }
1467}