risingwave_meta/barrier/
command.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18use std::iter;
19
20use itertools::Itertools;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::TableId;
23use risingwave_common::hash::ActorMapping;
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::SplitImpl;
29use risingwave_connector::source::cdc::{
30    CdcTableSnapshotSplitAssignment, CdcTableSnapshotSplitAssignmentWithGeneration,
31    build_pb_actor_cdc_table_snapshot_splits,
32    build_pb_actor_cdc_table_snapshot_splits_with_generation,
33};
34use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
35use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::catalog::CreateType;
38use risingwave_pb::catalog::table::PbTableType;
39use risingwave_pb::common::PbActorInfo;
40use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
41use risingwave_pb::source::{
42    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
43};
44use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
45use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
46use risingwave_pb::stream_plan::barrier_mutation::Mutation;
47use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
48use risingwave_pb::stream_plan::stream_node::NodeBody;
49use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
50use risingwave_pb::stream_plan::update_mutation::*;
51use risingwave_pb::stream_plan::{
52    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
53    LoadFinishMutation, PauseMutation, PbSinkAddColumns, PbUpstreamSinkInfo, ResumeMutation,
54    SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
55    SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
56};
57use risingwave_pb::stream_service::BarrierCompleteResponse;
58use tracing::warn;
59
60use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
61use crate::barrier::InflightSubscriptionInfo;
62use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
63use crate::barrier::edge_builder::FragmentEdgeBuildResult;
64use crate::barrier::info::BarrierInfo;
65use crate::barrier::rpc::ControlStreamManager;
66use crate::barrier::utils::collect_resp_info;
67use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
68use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
69use crate::manager::{StreamingJob, StreamingJobType};
70use crate::model::{
71    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
72    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
73    StreamJobFragments, StreamJobFragmentsToCreate,
74};
75use crate::stream::{
76    AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder,
77    JobReschedulePostUpdates, SplitAssignment, SplitState, ThrottleConfig, UpstreamSinkInfo,
78    build_actor_connector_splits,
79};
80
81/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
82/// in some fragment, like scaling or migrating.
83#[derive(Debug, Clone)]
84pub struct Reschedule {
85    /// Added actors in this fragment.
86    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
87
88    /// Removed actors in this fragment.
89    pub removed_actors: HashSet<ActorId>,
90
91    /// Vnode bitmap updates for some actors in this fragment.
92    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
93
94    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
95    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
96    /// New hash mapping of the upstream dispatcher to be updated.
97    ///
98    /// This field exists only when there's upstream fragment and the current fragment is
99    /// hash-sharded.
100    pub upstream_dispatcher_mapping: Option<ActorMapping>,
101
102    /// The downstream fragments of this fragment.
103    pub downstream_fragment_ids: Vec<FragmentId>,
104
105    /// Reassigned splits for source actors.
106    /// It becomes the `actor_splits` in [`UpdateMutation`].
107    /// `Source` and `SourceBackfill` are handled together here.
108    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
109
110    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
111
112    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
113
114    pub cdc_table_id: Option<u32>,
115}
116
117/// Replacing an old job with a new one. All actors in the job will be rebuilt.
118///
119/// Current use cases:
120/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
121/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
122///   will replace a table job's plan.
123#[derive(Debug, Clone)]
124pub struct ReplaceStreamJobPlan {
125    pub old_fragments: StreamJobFragments,
126    pub new_fragments: StreamJobFragmentsToCreate,
127    /// Downstream jobs of the replaced job need to update their `Merge` node to
128    /// connect to the new fragment.
129    pub replace_upstream: FragmentReplaceUpstream,
130    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
131    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
132    /// We need to reassign splits for it.
133    ///
134    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
135    /// `backfill_splits`.
136    pub init_split_assignment: SplitAssignment,
137    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
138    pub streaming_job: StreamingJob,
139    /// The temporary dummy job fragments id of new table fragment
140    pub tmp_id: u32,
141    /// The state table ids to be dropped.
142    pub to_drop_state_table_ids: Vec<TableId>,
143    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
144    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
145}
146
147impl ReplaceStreamJobPlan {
148    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
149        let mut fragment_changes = HashMap::new();
150        for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
151            let fragment_change = CommandFragmentChanges::NewFragment {
152                job_id: self.streaming_job.id().into(),
153                info: new_fragment,
154                is_existing: false,
155            };
156            fragment_changes
157                .try_insert(fragment_id, fragment_change)
158                .expect("non-duplicate");
159        }
160        for fragment in self.old_fragments.fragments.values() {
161            fragment_changes
162                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
163                .expect("non-duplicate");
164        }
165        for (fragment_id, replace_map) in &self.replace_upstream {
166            fragment_changes
167                .try_insert(
168                    *fragment_id,
169                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
170                )
171                .expect("non-duplicate");
172        }
173        if let Some(sinks) = &self.auto_refresh_schema_sinks {
174            for sink in sinks {
175                let fragment_change = CommandFragmentChanges::NewFragment {
176                    job_id: TableId::new(sink.original_sink.id as _),
177                    info: sink.new_fragment_info(),
178                    is_existing: false,
179                };
180                fragment_changes
181                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
182                    .expect("non-duplicate");
183                fragment_changes
184                    .try_insert(
185                        sink.original_fragment.fragment_id,
186                        CommandFragmentChanges::RemoveFragment,
187                    )
188                    .expect("non-duplicate");
189            }
190        }
191        fragment_changes
192    }
193
194    /// `old_fragment_id` -> `new_fragment_id`
195    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
196        let mut fragment_replacements = HashMap::new();
197        for (upstream_fragment_id, new_upstream_fragment_id) in
198            self.replace_upstream.values().flatten()
199        {
200            {
201                let r =
202                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
203                if let Some(r) = r {
204                    assert_eq!(
205                        *new_upstream_fragment_id, r,
206                        "one fragment is replaced by multiple fragments"
207                    );
208                }
209            }
210        }
211        fragment_replacements
212    }
213}
214
215#[derive(educe::Educe, Clone)]
216#[educe(Debug)]
217pub struct CreateStreamingJobCommandInfo {
218    #[educe(Debug(ignore))]
219    pub stream_job_fragments: StreamJobFragmentsToCreate,
220    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
221    pub init_split_assignment: SplitAssignment,
222    pub definition: String,
223    pub job_type: StreamingJobType,
224    pub create_type: CreateType,
225    pub streaming_job: StreamingJob,
226    pub fragment_backfill_ordering: FragmentBackfillOrder,
227    pub cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
228}
229
230impl StreamJobFragments {
231    pub(super) fn new_fragment_info(
232        &self,
233    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
234        self.fragments.values().map(|fragment| {
235            (
236                fragment.fragment_id,
237                InflightFragmentInfo {
238                    fragment_id: fragment.fragment_id,
239                    distribution_type: fragment.distribution_type.into(),
240                    nodes: fragment.nodes.clone(),
241                    actors: fragment
242                        .actors
243                        .iter()
244                        .map(|actor| {
245                            (
246                                actor.actor_id,
247                                InflightActorInfo {
248                                    worker_id: self
249                                        .actor_status
250                                        .get(&actor.actor_id)
251                                        .expect("should exist")
252                                        .worker_id()
253                                        as WorkerId,
254                                    vnode_bitmap: actor.vnode_bitmap.clone(),
255                                },
256                            )
257                        })
258                        .collect(),
259                    state_table_ids: fragment
260                        .state_table_ids
261                        .iter()
262                        .map(|table_id| TableId::new(*table_id))
263                        .collect(),
264                },
265            )
266        })
267    }
268}
269
270#[derive(Debug, Clone)]
271pub struct SnapshotBackfillInfo {
272    /// `table_id` -> `Some(snapshot_backfill_epoch)`
273    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
274    /// by global barrier worker when handling the command.
275    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
276}
277
278#[derive(Debug, Clone)]
279pub enum CreateStreamingJobType {
280    Normal,
281    SinkIntoTable(UpstreamSinkInfo),
282    SnapshotBackfill(SnapshotBackfillInfo),
283}
284
285/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
286/// it will [build different barriers to send](Self::to_mutation),
287/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
288// FIXME: this enum is significantly large on stack, box it
289#[derive(Debug)]
290pub enum Command {
291    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
292    /// all messages before the checkpoint barrier should have been committed.
293    Flush,
294
295    /// `Pause` command generates a `Pause` barrier **only if**
296    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
297    Pause,
298
299    /// `Resume` command generates a `Resume` barrier **only
300    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
301    /// will be generated.
302    Resume,
303
304    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
305    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
306    /// dropped by reference counts before.
307    ///
308    /// Barriers from the actors to be dropped will STILL be collected.
309    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
310    /// drop actors, and then delete the job fragments info from meta store.
311    DropStreamingJobs {
312        streaming_job_ids: HashSet<TableId>,
313        actors: Vec<ActorId>,
314        unregistered_state_table_ids: HashSet<TableId>,
315        unregistered_fragment_ids: HashSet<FragmentId>,
316        // target_fragment -> [sink_fragments]
317        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
318    },
319
320    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
321    ///
322    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
323    /// be collected since the barrier should be passthrough.
324    ///
325    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
326    /// it adds the job fragments info to meta store. However, the creating progress will **last
327    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
328    /// will be set to `Created`.
329    CreateStreamingJob {
330        info: CreateStreamingJobCommandInfo,
331        job_type: CreateStreamingJobType,
332        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
333    },
334    MergeSnapshotBackfillStreamingJobs(
335        HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
336    ),
337
338    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
339    /// Mainly used for scaling and migration.
340    ///
341    /// Barriers from which actors should be collected, and the post behavior of this command are
342    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
343    RescheduleFragment {
344        reschedules: HashMap<FragmentId, Reschedule>,
345        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
346        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
347        // Used for updating additional metadata after the barrier ends
348        post_updates: JobReschedulePostUpdates,
349    },
350
351    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
352    /// essentially switching the downstream of the old job fragments to the new ones, and
353    /// dropping the old job fragments. Used for schema change.
354    ///
355    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
356    /// of the Merge executors are changed additionally.
357    ReplaceStreamJob(ReplaceStreamJobPlan),
358
359    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
360    /// changed splits.
361    SourceChangeSplit(SplitState),
362
363    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
364    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
365    Throttle(ThrottleConfig),
366
367    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
368    /// materialize executor to start storing old value for subscription.
369    CreateSubscription {
370        subscription_id: u32,
371        upstream_mv_table_id: TableId,
372        retention_second: u64,
373    },
374
375    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
376    /// materialize executor to stop storing old value when there is no
377    /// subscription depending on it.
378    DropSubscription {
379        subscription_id: u32,
380        upstream_mv_table_id: TableId,
381    },
382
383    ConnectorPropsChange(ConnectorPropsChange),
384
385    /// `StartFragmentBackfill` command will trigger backfilling for specified scans by `fragment_id`.
386    StartFragmentBackfill {
387        fragment_ids: Vec<FragmentId>,
388    },
389
390    /// `Refresh` command generates a barrier to refresh a table by truncating state
391    /// and reloading data from source.
392    Refresh {
393        table_id: TableId,
394        associated_source_id: TableId,
395    },
396    LoadFinish {
397        table_id: TableId,
398        associated_source_id: TableId,
399    },
400}
401
402// For debugging and observability purposes. Can add more details later if needed.
403impl std::fmt::Display for Command {
404    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
405        match self {
406            Command::Flush => write!(f, "Flush"),
407            Command::Pause => write!(f, "Pause"),
408            Command::Resume => write!(f, "Resume"),
409            Command::DropStreamingJobs {
410                streaming_job_ids, ..
411            } => {
412                write!(
413                    f,
414                    "DropStreamingJobs: {}",
415                    streaming_job_ids.iter().sorted().join(", ")
416                )
417            }
418            Command::CreateStreamingJob { info, .. } => {
419                write!(f, "CreateStreamingJob: {}", info.streaming_job)
420            }
421            Command::MergeSnapshotBackfillStreamingJobs(_) => {
422                write!(f, "MergeSnapshotBackfillStreamingJobs")
423            }
424            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
425            Command::ReplaceStreamJob(plan) => {
426                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
427            }
428            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
429            Command::Throttle(_) => write!(f, "Throttle"),
430            Command::CreateSubscription {
431                subscription_id, ..
432            } => write!(f, "CreateSubscription: {subscription_id}"),
433            Command::DropSubscription {
434                subscription_id, ..
435            } => write!(f, "DropSubscription: {subscription_id}"),
436            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
437            Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
438            Command::Refresh {
439                table_id,
440                associated_source_id,
441            } => write!(
442                f,
443                "Refresh: {} (source: {})",
444                table_id, associated_source_id
445            ),
446            Command::LoadFinish {
447                table_id,
448                associated_source_id,
449            } => write!(
450                f,
451                "LoadFinish: {} (source: {})",
452                table_id, associated_source_id
453            ),
454        }
455    }
456}
457
458impl Command {
459    pub fn pause() -> Self {
460        Self::Pause
461    }
462
463    pub fn resume() -> Self {
464        Self::Resume
465    }
466
467    pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
468        match self {
469            Command::Flush => None,
470            Command::Pause => None,
471            Command::Resume => None,
472            Command::DropStreamingJobs {
473                unregistered_fragment_ids,
474                dropped_sink_fragment_by_targets,
475                ..
476            } => {
477                let changes = unregistered_fragment_ids
478                    .iter()
479                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
480                    .chain(dropped_sink_fragment_by_targets.iter().map(
481                        |(target_fragment, sink_fragments)| {
482                            (
483                                *target_fragment,
484                                CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
485                            )
486                        },
487                    ))
488                    .collect();
489
490                Some(changes)
491            }
492            Command::CreateStreamingJob { info, job_type, .. } => {
493                assert!(
494                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
495                    "should handle fragment changes separately for snapshot backfill"
496                );
497                let mut changes: HashMap<_, _> = info
498                    .stream_job_fragments
499                    .new_fragment_info()
500                    .map(|(fragment_id, fragment_info)| {
501                        (
502                            fragment_id,
503                            CommandFragmentChanges::NewFragment {
504                                job_id: info.streaming_job.id().into(),
505                                info: fragment_info,
506                                is_existing: false,
507                            },
508                        )
509                    })
510                    .collect();
511
512                if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
513                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
514                    changes.insert(
515                        downstream_fragment_id,
516                        CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
517                            upstream_fragment_id: ctx.sink_fragment_id,
518                            sink_output_schema: ctx.sink_output_fields.clone(),
519                            project_exprs: ctx.project_exprs.clone(),
520                        }),
521                    );
522                }
523
524                Some(changes)
525            }
526            Command::RescheduleFragment { reschedules, .. } => Some(
527                reschedules
528                    .iter()
529                    .map(|(fragment_id, reschedule)| {
530                        (
531                            *fragment_id,
532                            CommandFragmentChanges::Reschedule {
533                                new_actors: reschedule
534                                    .added_actors
535                                    .iter()
536                                    .flat_map(|(node_id, actors)| {
537                                        actors.iter().map(|actor_id| {
538                                            (
539                                                *actor_id,
540                                                InflightActorInfo {
541                                                    worker_id: *node_id,
542                                                    vnode_bitmap: reschedule
543                                                        .newly_created_actors
544                                                        .get(actor_id)
545                                                        .expect("should exist")
546                                                        .0
547                                                        .0
548                                                        .vnode_bitmap
549                                                        .clone(),
550                                                },
551                                            )
552                                        })
553                                    })
554                                    .collect(),
555                                actor_update_vnode_bitmap: reschedule
556                                    .vnode_bitmap_updates
557                                    .iter()
558                                    .filter(|(actor_id, _)| {
559                                        // only keep the existing actors
560                                        !reschedule.newly_created_actors.contains_key(actor_id)
561                                    })
562                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
563                                    .collect(),
564                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
565                            },
566                        )
567                    })
568                    .collect(),
569            ),
570            Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
571            Command::MergeSnapshotBackfillStreamingJobs(_) => None,
572            Command::SourceChangeSplit { .. } => None,
573            Command::Throttle(_) => None,
574            Command::CreateSubscription { .. } => None,
575            Command::DropSubscription { .. } => None,
576            Command::ConnectorPropsChange(_) => None,
577            Command::StartFragmentBackfill { .. } => None,
578            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
579            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
580        }
581    }
582
583    pub fn need_checkpoint(&self) -> bool {
584        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
585        !matches!(self, Command::Resume)
586    }
587}
588
589#[derive(Debug, Clone)]
590pub enum BarrierKind {
591    Initial,
592    Barrier,
593    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
594    Checkpoint(Vec<u64>),
595}
596
597impl BarrierKind {
598    pub fn to_protobuf(&self) -> PbBarrierKind {
599        match self {
600            BarrierKind::Initial => PbBarrierKind::Initial,
601            BarrierKind::Barrier => PbBarrierKind::Barrier,
602            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
603        }
604    }
605
606    pub fn is_checkpoint(&self) -> bool {
607        matches!(self, BarrierKind::Checkpoint(_))
608    }
609
610    pub fn is_initial(&self) -> bool {
611        matches!(self, BarrierKind::Initial)
612    }
613
614    pub fn as_str_name(&self) -> &'static str {
615        match self {
616            BarrierKind::Initial => "Initial",
617            BarrierKind::Barrier => "Barrier",
618            BarrierKind::Checkpoint(_) => "Checkpoint",
619        }
620    }
621}
622
623/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
624/// [`Command`].
625pub(super) struct CommandContext {
626    subscription_info: InflightSubscriptionInfo,
627
628    pub(super) barrier_info: BarrierInfo,
629
630    pub(super) table_ids_to_commit: HashSet<TableId>,
631
632    pub(super) command: Option<Command>,
633
634    /// The tracing span of this command.
635    ///
636    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
637    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
638    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
639    _span: tracing::Span,
640}
641
642impl std::fmt::Debug for CommandContext {
643    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
644        f.debug_struct("CommandContext")
645            .field("barrier_info", &self.barrier_info)
646            .field("command", &self.command)
647            .finish()
648    }
649}
650
651impl std::fmt::Display for CommandContext {
652    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
653        write!(
654            f,
655            "prev_epoch={}, curr_epoch={}, kind={}",
656            self.barrier_info.prev_epoch.value().0,
657            self.barrier_info.curr_epoch.value().0,
658            self.barrier_info.kind.as_str_name()
659        )?;
660        if let Some(command) = &self.command {
661            write!(f, ", command={}", command)?;
662        }
663        Ok(())
664    }
665}
666
667impl CommandContext {
668    pub(super) fn new(
669        barrier_info: BarrierInfo,
670        subscription_info: InflightSubscriptionInfo,
671        table_ids_to_commit: HashSet<TableId>,
672        command: Option<Command>,
673        span: tracing::Span,
674    ) -> Self {
675        Self {
676            subscription_info,
677            barrier_info,
678            table_ids_to_commit,
679            command,
680            _span: span,
681        }
682    }
683
684    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
685        let Some(truncate_timestamptz) = Timestamptz::from_secs(
686            self.barrier_info
687                .prev_epoch
688                .value()
689                .as_timestamptz()
690                .timestamp()
691                - retention_second as i64,
692        ) else {
693            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
694            return self.barrier_info.prev_epoch.value();
695        };
696        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
697    }
698
699    pub(super) fn collect_commit_epoch_info(
700        &self,
701        info: &mut CommitEpochInfo,
702        resps: Vec<BarrierCompleteResponse>,
703        backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
704    ) {
705        let (
706            sst_to_context,
707            synced_ssts,
708            new_table_watermarks,
709            old_value_ssts,
710            vector_index_adds,
711            truncate_tables,
712        ) = collect_resp_info(resps);
713
714        let new_table_fragment_infos =
715            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
716                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
717            {
718                let table_fragments = &info.stream_job_fragments;
719                let mut table_ids: HashSet<_> = table_fragments
720                    .internal_table_ids()
721                    .into_iter()
722                    .map(TableId::new)
723                    .collect();
724                if let Some(mv_table_id) = table_fragments.mv_table_id() {
725                    table_ids.insert(TableId::new(mv_table_id));
726                }
727
728                vec![NewTableFragmentInfo { table_ids }]
729            } else {
730                vec![]
731            };
732
733        let mut mv_log_store_truncate_epoch = HashMap::new();
734        // TODO: may collect cross db snapshot backfill
735        let mut update_truncate_epoch =
736            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
737                .entry(table_id.table_id)
738            {
739                Entry::Occupied(mut entry) => {
740                    let prev_truncate_epoch = entry.get_mut();
741                    if truncate_epoch < *prev_truncate_epoch {
742                        *prev_truncate_epoch = truncate_epoch;
743                    }
744                }
745                Entry::Vacant(entry) => {
746                    entry.insert(truncate_epoch);
747                }
748            };
749        for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
750            if let Some(truncate_epoch) = subscriptions
751                .values()
752                .max()
753                .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
754            {
755                update_truncate_epoch(*mv_table_id, truncate_epoch);
756            }
757        }
758        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
759            for mv_table_id in upstream_mv_table_ids {
760                update_truncate_epoch(mv_table_id, backfill_epoch);
761            }
762        }
763
764        let table_new_change_log = build_table_change_log_delta(
765            old_value_ssts.into_iter(),
766            synced_ssts.iter().map(|sst| &sst.sst_info),
767            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
768            mv_log_store_truncate_epoch.into_iter(),
769        );
770
771        let epoch = self.barrier_info.prev_epoch();
772        for table_id in &self.table_ids_to_commit {
773            info.tables_to_commit
774                .try_insert(*table_id, epoch)
775                .expect("non duplicate");
776        }
777
778        info.sstables.extend(synced_ssts);
779        info.new_table_watermarks.extend(new_table_watermarks);
780        info.sst_to_context.extend(sst_to_context);
781        info.new_table_fragment_infos
782            .extend(new_table_fragment_infos);
783        info.change_log_delta.extend(table_new_change_log);
784        for (table_id, vector_index_adds) in vector_index_adds {
785            info.vector_index_delta
786                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
787                .expect("non-duplicate");
788        }
789        if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
790            for fragment in job_info.stream_job_fragments.fragments.values() {
791                visit_stream_node_cont(&fragment.nodes, |node| {
792                    match node.node_body.as_ref().unwrap() {
793                        NodeBody::VectorIndexWrite(vector_index_write) => {
794                            let index_table = vector_index_write.table.as_ref().unwrap();
795                            assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
796                            info.vector_index_delta
797                                .try_insert(
798                                    index_table.id.into(),
799                                    VectorIndexDelta::Init(PbVectorIndexInit {
800                                        info: Some(index_table.vector_index_info.unwrap()),
801                                    }),
802                                )
803                                .expect("non-duplicate");
804                            false
805                        }
806                        _ => true,
807                    }
808                })
809            }
810        }
811        info.truncate_tables.extend(truncate_tables);
812    }
813}
814
815impl Command {
816    /// Generate a mutation for the given command.
817    ///
818    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
819    pub(super) fn to_mutation(
820        &self,
821        is_currently_paused: bool,
822        edges: &mut Option<FragmentEdgeBuildResult>,
823        control_stream_manager: &ControlStreamManager,
824    ) -> Option<Mutation> {
825        match self {
826            Command::Flush => None,
827
828            Command::Pause => {
829                // Only pause when the cluster is not already paused.
830                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
831                if !is_currently_paused {
832                    Some(Mutation::Pause(PauseMutation {}))
833                } else {
834                    None
835                }
836            }
837
838            Command::Resume => {
839                // Only resume when the cluster is paused with the same reason.
840                if is_currently_paused {
841                    Some(Mutation::Resume(ResumeMutation {}))
842                } else {
843                    None
844                }
845            }
846
847            Command::SourceChangeSplit(SplitState {
848                split_assignment, ..
849            }) => {
850                let mut diff = HashMap::new();
851
852                for actor_splits in split_assignment.values() {
853                    diff.extend(actor_splits.clone());
854                }
855
856                Some(Mutation::Splits(SourceChangeSplitMutation {
857                    actor_splits: build_actor_connector_splits(&diff),
858                }))
859            }
860
861            Command::Throttle(config) => {
862                let mut actor_to_apply = HashMap::new();
863                for per_fragment in config.values() {
864                    actor_to_apply.extend(
865                        per_fragment
866                            .iter()
867                            .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
868                    );
869                }
870
871                Some(Mutation::Throttle(ThrottleMutation {
872                    actor_throttle: actor_to_apply,
873                }))
874            }
875
876            Command::DropStreamingJobs {
877                actors,
878                dropped_sink_fragment_by_targets,
879                ..
880            } => Some(Mutation::Stop(StopMutation {
881                actors: actors.clone(),
882                dropped_sink_fragments: dropped_sink_fragment_by_targets
883                    .values()
884                    .flatten()
885                    .cloned()
886                    .collect(),
887            })),
888
889            Command::CreateStreamingJob {
890                info:
891                    CreateStreamingJobCommandInfo {
892                        stream_job_fragments: table_fragments,
893                        init_split_assignment: split_assignment,
894                        upstream_fragment_downstreams,
895                        fragment_backfill_ordering,
896                        cdc_table_snapshot_split_assignment,
897                        ..
898                    },
899                job_type,
900                ..
901            } => {
902                let edges = edges.as_mut().expect("should exist");
903                let added_actors = table_fragments.actor_ids();
904                let actor_splits = split_assignment
905                    .values()
906                    .flat_map(build_actor_connector_splits)
907                    .collect();
908                let subscriptions_to_add =
909                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
910                        job_type
911                    {
912                        snapshot_backfill_info
913                            .upstream_mv_table_id_to_backfill_epoch
914                            .keys()
915                            .map(|table_id| SubscriptionUpstreamInfo {
916                                subscriber_id: table_fragments.stream_job_id().table_id,
917                                upstream_mv_table_id: table_id.table_id,
918                            })
919                            .collect()
920                    } else {
921                        Default::default()
922                    };
923                let backfill_nodes_to_pause: Vec<_> =
924                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
925                        .into_iter()
926                        .collect();
927
928                let new_upstream_sinks =
929                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
930                        sink_fragment_id,
931                        sink_output_fields,
932                        project_exprs,
933                        new_sink_downstream,
934                        ..
935                    }) = job_type
936                    {
937                        let new_sink_actors = table_fragments
938                            .actors_to_create()
939                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
940                            .exactly_one()
941                            .map(|(_, _, actors)| {
942                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
943                                    actor_id: actor.actor_id,
944                                    host: Some(control_stream_manager.host_addr(worker_id)),
945                                })
946                            })
947                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
948                        let new_upstream_sink = PbNewUpstreamSink {
949                            info: Some(PbUpstreamSinkInfo {
950                                upstream_fragment_id: *sink_fragment_id,
951                                sink_output_schema: sink_output_fields.clone(),
952                                project_exprs: project_exprs.clone(),
953                            }),
954                            upstream_actors: new_sink_actors.collect(),
955                        };
956                        HashMap::from([(
957                            new_sink_downstream.downstream_fragment_id,
958                            new_upstream_sink.clone(),
959                        )])
960                    } else {
961                        HashMap::new()
962                    };
963
964                let add_mutation = AddMutation {
965                    actor_dispatchers: edges
966                        .dispatchers
967                        .extract_if(|fragment_id, _| {
968                            upstream_fragment_downstreams.contains_key(fragment_id)
969                        })
970                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
971                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
972                        .collect(),
973                    added_actors,
974                    actor_splits,
975                    // If the cluster is already paused, the new actors should be paused too.
976                    pause: is_currently_paused,
977                    subscriptions_to_add,
978                    backfill_nodes_to_pause,
979                    actor_cdc_table_snapshot_splits:
980                        build_pb_actor_cdc_table_snapshot_splits_with_generation(
981                            cdc_table_snapshot_split_assignment.clone(),
982                        )
983                        .into(),
984                    new_upstream_sinks,
985                };
986
987                Some(Mutation::Add(add_mutation))
988            }
989            Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
990                Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
991                    info: jobs_to_merge
992                        .iter()
993                        .flat_map(|(table_id, (backfill_upstream_tables, _))| {
994                            backfill_upstream_tables
995                                .iter()
996                                .map(move |upstream_table_id| SubscriptionUpstreamInfo {
997                                    subscriber_id: table_id.table_id,
998                                    upstream_mv_table_id: upstream_table_id.table_id,
999                                })
1000                        })
1001                        .collect(),
1002                }))
1003            }
1004
1005            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1006                old_fragments,
1007                replace_upstream,
1008                upstream_fragment_downstreams,
1009                init_split_assignment,
1010                auto_refresh_schema_sinks,
1011                cdc_table_snapshot_split_assignment,
1012                ..
1013            }) => {
1014                let edges = edges.as_mut().expect("should exist");
1015                let merge_updates = edges
1016                    .merge_updates
1017                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1018                    .collect();
1019                let dispatchers = edges
1020                    .dispatchers
1021                    .extract_if(|fragment_id, _| {
1022                        upstream_fragment_downstreams.contains_key(fragment_id)
1023                    })
1024                    .collect();
1025                let cdc_table_snapshot_split_assignment =
1026                    if cdc_table_snapshot_split_assignment.is_empty() {
1027                        CdcTableSnapshotSplitAssignmentWithGeneration::empty()
1028                    } else {
1029                        CdcTableSnapshotSplitAssignmentWithGeneration::new(
1030                            cdc_table_snapshot_split_assignment.clone(),
1031                            control_stream_manager
1032                                .env
1033                                .cdc_table_backfill_tracker
1034                                .next_generation(iter::once(old_fragments.stream_job_id.table_id)),
1035                        )
1036                    };
1037                Self::generate_update_mutation_for_replace_table(
1038                    old_fragments.actor_ids().into_iter().chain(
1039                        auto_refresh_schema_sinks
1040                            .as_ref()
1041                            .into_iter()
1042                            .flat_map(|sinks| {
1043                                sinks.iter().flat_map(|sink| {
1044                                    sink.original_fragment
1045                                        .actors
1046                                        .iter()
1047                                        .map(|actor| actor.actor_id)
1048                                })
1049                            }),
1050                    ),
1051                    merge_updates,
1052                    dispatchers,
1053                    init_split_assignment,
1054                    cdc_table_snapshot_split_assignment,
1055                    auto_refresh_schema_sinks.as_ref(),
1056                )
1057            }
1058
1059            Command::RescheduleFragment {
1060                reschedules,
1061                fragment_actors,
1062                ..
1063            } => {
1064                let mut dispatcher_update = HashMap::new();
1065                for reschedule in reschedules.values() {
1066                    for &(upstream_fragment_id, dispatcher_id) in
1067                        &reschedule.upstream_fragment_dispatcher_ids
1068                    {
1069                        // Find the actors of the upstream fragment.
1070                        let upstream_actor_ids = fragment_actors
1071                            .get(&upstream_fragment_id)
1072                            .expect("should contain");
1073
1074                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1075
1076                        // Record updates for all actors.
1077                        for &actor_id in upstream_actor_ids {
1078                            let added_downstream_actor_id = if upstream_reschedule
1079                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1080                                .unwrap_or(true)
1081                            {
1082                                reschedule
1083                                    .added_actors
1084                                    .values()
1085                                    .flatten()
1086                                    .cloned()
1087                                    .collect()
1088                            } else {
1089                                Default::default()
1090                            };
1091                            // Index with the dispatcher id to check duplicates.
1092                            dispatcher_update
1093                                .try_insert(
1094                                    (actor_id, dispatcher_id),
1095                                    DispatcherUpdate {
1096                                        actor_id,
1097                                        dispatcher_id,
1098                                        hash_mapping: reschedule
1099                                            .upstream_dispatcher_mapping
1100                                            .as_ref()
1101                                            .map(|m| m.to_protobuf()),
1102                                        added_downstream_actor_id,
1103                                        removed_downstream_actor_id: reschedule
1104                                            .removed_actors
1105                                            .iter()
1106                                            .cloned()
1107                                            .collect(),
1108                                    },
1109                                )
1110                                .unwrap();
1111                        }
1112                    }
1113                }
1114                let dispatcher_update = dispatcher_update.into_values().collect();
1115
1116                let mut merge_update = HashMap::new();
1117                for (&fragment_id, reschedule) in reschedules {
1118                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1119                        // Find the actors of the downstream fragment.
1120                        let downstream_actor_ids = fragment_actors
1121                            .get(&downstream_fragment_id)
1122                            .expect("should contain");
1123
1124                        // Downstream removed actors should be skipped
1125                        // Newly created actors of the current fragment will not dispatch Update
1126                        // barriers to them
1127                        let downstream_removed_actors: HashSet<_> = reschedules
1128                            .get(&downstream_fragment_id)
1129                            .map(|downstream_reschedule| {
1130                                downstream_reschedule
1131                                    .removed_actors
1132                                    .iter()
1133                                    .copied()
1134                                    .collect()
1135                            })
1136                            .unwrap_or_default();
1137
1138                        // Record updates for all actors.
1139                        for &actor_id in downstream_actor_ids {
1140                            if downstream_removed_actors.contains(&actor_id) {
1141                                continue;
1142                            }
1143
1144                            // Index with the fragment id to check duplicates.
1145                            merge_update
1146                                .try_insert(
1147                                    (actor_id, fragment_id),
1148                                    MergeUpdate {
1149                                        actor_id,
1150                                        upstream_fragment_id: fragment_id,
1151                                        new_upstream_fragment_id: None,
1152                                        added_upstream_actors: reschedule
1153                                            .added_actors
1154                                            .iter()
1155                                            .flat_map(|(worker_id, actors)| {
1156                                                let host =
1157                                                    control_stream_manager.host_addr(*worker_id);
1158                                                actors.iter().map(move |actor_id| PbActorInfo {
1159                                                    actor_id: *actor_id,
1160                                                    host: Some(host.clone()),
1161                                                })
1162                                            })
1163                                            .collect(),
1164                                        removed_upstream_actor_id: reschedule
1165                                            .removed_actors
1166                                            .iter()
1167                                            .cloned()
1168                                            .collect(),
1169                                    },
1170                                )
1171                                .unwrap();
1172                        }
1173                    }
1174                }
1175                let merge_update = merge_update.into_values().collect();
1176
1177                let mut actor_vnode_bitmap_update = HashMap::new();
1178                for reschedule in reschedules.values() {
1179                    // Record updates for all actors in this fragment.
1180                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1181                        let bitmap = bitmap.to_protobuf();
1182                        actor_vnode_bitmap_update
1183                            .try_insert(actor_id, bitmap)
1184                            .unwrap();
1185                    }
1186                }
1187                let dropped_actors = reschedules
1188                    .values()
1189                    .flat_map(|r| r.removed_actors.iter().copied())
1190                    .collect();
1191                let mut actor_splits = HashMap::new();
1192                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1193                let mut cdc_table_ids: HashSet<_> = HashSet::default();
1194                for reschedule in reschedules.values() {
1195                    for (actor_id, splits) in &reschedule.actor_splits {
1196                        actor_splits.insert(
1197                            *actor_id as ActorId,
1198                            ConnectorSplits {
1199                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1200                            },
1201                        );
1202                    }
1203                    actor_cdc_table_snapshot_splits.extend(
1204                        build_pb_actor_cdc_table_snapshot_splits(
1205                            reschedule.cdc_table_snapshot_split_assignment.clone(),
1206                        ),
1207                    );
1208                    if let Some(cdc_table_id) = reschedule.cdc_table_id {
1209                        cdc_table_ids.insert(cdc_table_id);
1210                    }
1211                }
1212
1213                // we don't create dispatchers in reschedule scenario
1214                let actor_new_dispatchers = HashMap::new();
1215                let actor_cdc_table_snapshot_splits = if actor_cdc_table_snapshot_splits.is_empty()
1216                {
1217                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
1218                        CdcTableSnapshotSplitAssignmentWithGeneration::empty(),
1219                    )
1220                    .into()
1221                } else {
1222                    PbCdcTableSnapshotSplitsWithGeneration {
1223                        splits: actor_cdc_table_snapshot_splits,
1224                        generation: control_stream_manager
1225                            .env
1226                            .cdc_table_backfill_tracker
1227                            .next_generation(cdc_table_ids.into_iter()),
1228                    }
1229                    .into()
1230                };
1231                let mutation = Mutation::Update(UpdateMutation {
1232                    dispatcher_update,
1233                    merge_update,
1234                    actor_vnode_bitmap_update,
1235                    dropped_actors,
1236                    actor_splits,
1237                    actor_new_dispatchers,
1238                    actor_cdc_table_snapshot_splits,
1239                    sink_add_columns: Default::default(),
1240                });
1241                tracing::debug!("update mutation: {mutation:?}");
1242                Some(mutation)
1243            }
1244
1245            Command::CreateSubscription {
1246                upstream_mv_table_id,
1247                subscription_id,
1248                ..
1249            } => Some(Mutation::Add(AddMutation {
1250                actor_dispatchers: Default::default(),
1251                added_actors: vec![],
1252                actor_splits: Default::default(),
1253                pause: false,
1254                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1255                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1256                    subscriber_id: *subscription_id,
1257                }],
1258                backfill_nodes_to_pause: vec![],
1259                actor_cdc_table_snapshot_splits: Default::default(),
1260                new_upstream_sinks: Default::default(),
1261            })),
1262            Command::DropSubscription {
1263                upstream_mv_table_id,
1264                subscription_id,
1265            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1266                info: vec![SubscriptionUpstreamInfo {
1267                    subscriber_id: *subscription_id,
1268                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1269                }],
1270            })),
1271            Command::ConnectorPropsChange(config) => {
1272                let mut connector_props_infos = HashMap::default();
1273                for (k, v) in config {
1274                    connector_props_infos.insert(
1275                        *k,
1276                        ConnectorPropsInfo {
1277                            connector_props_info: v.clone(),
1278                        },
1279                    );
1280                }
1281                Some(Mutation::ConnectorPropsChange(
1282                    ConnectorPropsChangeMutation {
1283                        connector_props_infos,
1284                    },
1285                ))
1286            }
1287            Command::StartFragmentBackfill { fragment_ids } => Some(
1288                Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1289                    fragment_ids: fragment_ids.clone(),
1290                }),
1291            ),
1292            Command::Refresh {
1293                table_id,
1294                associated_source_id,
1295            } => Some(Mutation::RefreshStart(
1296                risingwave_pb::stream_plan::RefreshStartMutation {
1297                    table_id: table_id.table_id,
1298                    associated_source_id: associated_source_id.table_id,
1299                },
1300            )),
1301            Command::LoadFinish {
1302                table_id: _,
1303                associated_source_id,
1304            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1305                associated_source_id: associated_source_id.table_id,
1306            })),
1307        }
1308    }
1309
1310    pub(super) fn actors_to_create(
1311        &self,
1312        graph_info: &InflightDatabaseInfo,
1313        edges: &mut Option<FragmentEdgeBuildResult>,
1314        control_stream_manager: &ControlStreamManager,
1315    ) -> Option<StreamJobActorsToCreate> {
1316        match self {
1317            Command::CreateStreamingJob { info, job_type, .. } => {
1318                if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1319                    // for snapshot backfill, the actors to create is measured separately
1320                    return None;
1321                }
1322                let actors_to_create = info.stream_job_fragments.actors_to_create();
1323                let edges = edges.as_mut().expect("should exist");
1324                Some(edges.collect_actors_to_create(actors_to_create))
1325            }
1326            Command::RescheduleFragment {
1327                reschedules,
1328                fragment_actors,
1329                ..
1330            } => {
1331                let mut actor_upstreams = Self::collect_actor_upstreams(
1332                    reschedules.iter().map(|(fragment_id, reschedule)| {
1333                        (
1334                            *fragment_id,
1335                            reschedule.newly_created_actors.values().map(
1336                                |((actor, dispatchers), _)| {
1337                                    (actor.actor_id, dispatchers.as_slice())
1338                                },
1339                            ),
1340                        )
1341                    }),
1342                    Some((reschedules, fragment_actors)),
1343                    graph_info,
1344                    control_stream_manager,
1345                );
1346                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1347                for (fragment_id, (actor, dispatchers), worker_id) in
1348                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1349                        reschedule
1350                            .newly_created_actors
1351                            .values()
1352                            .map(|(actors, status)| (*fragment_id, actors, status))
1353                    })
1354                {
1355                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1356                    map.entry(*worker_id)
1357                        .or_default()
1358                        .entry(fragment_id)
1359                        .or_insert_with(|| {
1360                            let node = graph_info.fragment(fragment_id).nodes.clone();
1361                            (node, vec![])
1362                        })
1363                        .1
1364                        .push((actor.clone(), upstreams, dispatchers.clone()));
1365                }
1366                Some(map)
1367            }
1368            Command::ReplaceStreamJob(replace_table) => {
1369                let edges = edges.as_mut().expect("should exist");
1370                let mut actors =
1371                    edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create());
1372                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1373                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1374                        (
1375                            sink.new_fragment.fragment_id,
1376                            &sink.new_fragment.nodes,
1377                            sink.new_fragment.actors.iter().map(|actor| {
1378                                (
1379                                    actor,
1380                                    sink.actor_status[&actor.actor_id]
1381                                        .location
1382                                        .as_ref()
1383                                        .unwrap()
1384                                        .worker_node_id as _,
1385                                )
1386                            }),
1387                        )
1388                    }));
1389                    for (worker_id, fragment_actors) in sink_actors {
1390                        actors.entry(worker_id).or_default().extend(fragment_actors);
1391                    }
1392                }
1393                Some(actors)
1394            }
1395            _ => None,
1396        }
1397    }
1398
1399    fn generate_update_mutation_for_replace_table(
1400        dropped_actors: impl IntoIterator<Item = ActorId>,
1401        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1402        dispatchers: FragmentActorDispatchers,
1403        init_split_assignment: &SplitAssignment,
1404        cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
1405        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1406    ) -> Option<Mutation> {
1407        let dropped_actors = dropped_actors.into_iter().collect();
1408
1409        let actor_new_dispatchers = dispatchers
1410            .into_values()
1411            .flatten()
1412            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1413            .collect();
1414
1415        let actor_splits = init_split_assignment
1416            .values()
1417            .flat_map(build_actor_connector_splits)
1418            .collect();
1419        Some(Mutation::Update(UpdateMutation {
1420            actor_new_dispatchers,
1421            merge_update: merge_updates.into_values().flatten().collect(),
1422            dropped_actors,
1423            actor_splits,
1424            actor_cdc_table_snapshot_splits:
1425                build_pb_actor_cdc_table_snapshot_splits_with_generation(
1426                    cdc_table_snapshot_split_assignment,
1427                )
1428                .into(),
1429            sink_add_columns: auto_refresh_schema_sinks
1430                .as_ref()
1431                .into_iter()
1432                .flat_map(|sinks| {
1433                    sinks.iter().map(|sink| {
1434                        (
1435                            sink.original_sink.id,
1436                            PbSinkAddColumns {
1437                                fields: sink
1438                                    .newly_add_fields
1439                                    .iter()
1440                                    .map(|field| field.to_prost())
1441                                    .collect(),
1442                            },
1443                        )
1444                    })
1445                })
1446                .collect(),
1447            ..Default::default()
1448        }))
1449    }
1450
1451    /// For `CancelStreamingJob`, returns the table id of the target table.
1452    pub fn jobs_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1453        match self {
1454            Command::DropStreamingJobs {
1455                streaming_job_ids, ..
1456            } => Some(streaming_job_ids.iter().cloned()),
1457            _ => None,
1458        }
1459        .into_iter()
1460        .flatten()
1461    }
1462}
1463
1464impl Command {
1465    #[expect(clippy::type_complexity)]
1466    pub(super) fn collect_actor_upstreams(
1467        actor_dispatchers: impl Iterator<
1468            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1469        >,
1470        reschedule_dispatcher_update: Option<(
1471            &HashMap<FragmentId, Reschedule>,
1472            &HashMap<FragmentId, HashSet<ActorId>>,
1473        )>,
1474        graph_info: &InflightDatabaseInfo,
1475        control_stream_manager: &ControlStreamManager,
1476    ) -> HashMap<ActorId, ActorUpstreams> {
1477        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1478        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1479            let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1480            for (upstream_actor_id, dispatchers) in upstream_actors {
1481                let upstream_actor_location =
1482                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1483                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1484                for downstream_actor_id in dispatchers
1485                    .iter()
1486                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1487                {
1488                    actor_upstreams
1489                        .entry(*downstream_actor_id)
1490                        .or_default()
1491                        .entry(upstream_fragment_id)
1492                        .or_default()
1493                        .insert(
1494                            upstream_actor_id,
1495                            PbActorInfo {
1496                                actor_id: upstream_actor_id,
1497                                host: Some(upstream_actor_host.clone()),
1498                            },
1499                        );
1500                }
1501            }
1502        }
1503        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1504            for reschedule in reschedules.values() {
1505                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1506                    let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1507                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1508                    for upstream_actor_id in fragment_actors
1509                        .get(upstream_fragment_id)
1510                        .expect("should exist")
1511                    {
1512                        let upstream_actor_location =
1513                            upstream_fragment.actors[upstream_actor_id].worker_id;
1514                        let upstream_actor_host =
1515                            control_stream_manager.host_addr(upstream_actor_location);
1516                        if let Some(upstream_reschedule) = upstream_reschedule
1517                            && upstream_reschedule
1518                                .removed_actors
1519                                .contains(upstream_actor_id)
1520                        {
1521                            continue;
1522                        }
1523                        for (_, downstream_actor_id) in
1524                            reschedule
1525                                .added_actors
1526                                .iter()
1527                                .flat_map(|(worker_id, actors)| {
1528                                    actors.iter().map(|actor| (*worker_id, *actor))
1529                                })
1530                        {
1531                            actor_upstreams
1532                                .entry(downstream_actor_id)
1533                                .or_default()
1534                                .entry(*upstream_fragment_id)
1535                                .or_default()
1536                                .insert(
1537                                    *upstream_actor_id,
1538                                    PbActorInfo {
1539                                        actor_id: *upstream_actor_id,
1540                                        host: Some(upstream_actor_host.clone()),
1541                                    },
1542                                );
1543                        }
1544                    }
1545                }
1546            }
1547        }
1548        actor_upstreams
1549    }
1550}