risingwave_meta/barrier/
command.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::ActorMapping;
22use risingwave_common::must_match;
23use risingwave_common::types::Timestamptz;
24use risingwave_common::util::epoch::Epoch;
25use risingwave_connector::source::SplitImpl;
26use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::catalog::{CreateType, Table};
29use risingwave_pb::common::ActorInfo;
30use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
31use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
32use risingwave_pb::stream_plan::barrier_mutation::Mutation;
33use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
34use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
35use risingwave_pb::stream_plan::update_mutation::*;
36use risingwave_pb::stream_plan::{
37    AddMutation, BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatcher,
38    Dispatchers, DropSubscriptionsMutation, PauseMutation, ResumeMutation,
39    SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
40    SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
41};
42use risingwave_pb::stream_service::BarrierCompleteResponse;
43use tracing::warn;
44
45use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
46use crate::barrier::InflightSubscriptionInfo;
47use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
48use crate::barrier::edge_builder::FragmentEdgeBuildResult;
49use crate::barrier::info::BarrierInfo;
50use crate::barrier::rpc::ControlStreamManager;
51use crate::barrier::utils::collect_resp_info;
52use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
53use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
54use crate::manager::{StreamingJob, StreamingJobType};
55use crate::model::{
56    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
57    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
58    StreamJobFragments, StreamJobFragmentsToCreate,
59};
60use crate::stream::{
61    ConnectorPropsChange, FragmentBackfillOrder, JobReschedulePostUpdates, SplitAssignment,
62    ThrottleConfig, build_actor_connector_splits,
63};
64
65/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
66/// in some fragment, like scaling or migrating.
67#[derive(Debug, Clone)]
68pub struct Reschedule {
69    /// Added actors in this fragment.
70    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
71
72    /// Removed actors in this fragment.
73    pub removed_actors: HashSet<ActorId>,
74
75    /// Vnode bitmap updates for some actors in this fragment.
76    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
77
78    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
79    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
80    /// New hash mapping of the upstream dispatcher to be updated.
81    ///
82    /// This field exists only when there's upstream fragment and the current fragment is
83    /// hash-sharded.
84    pub upstream_dispatcher_mapping: Option<ActorMapping>,
85
86    /// The downstream fragments of this fragment.
87    pub downstream_fragment_ids: Vec<FragmentId>,
88
89    /// Reassigned splits for source actors.
90    /// It becomes the `actor_splits` in [`UpdateMutation`].
91    /// `Source` and `SourceBackfill` are handled together here.
92    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
93
94    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
95}
96
97/// Replacing an old job with a new one. All actors in the job will be rebuilt.
98///
99/// Current use cases:
100/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
101/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
102///   will replace a table job's plan.
103#[derive(Debug, Clone)]
104pub struct ReplaceStreamJobPlan {
105    pub old_fragments: StreamJobFragments,
106    pub new_fragments: StreamJobFragmentsToCreate,
107    /// Downstream jobs of the replaced job need to update their `Merge` node to
108    /// connect to the new fragment.
109    pub replace_upstream: FragmentReplaceUpstream,
110    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
111    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
112    /// We need to reassign splits for it.
113    ///
114    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
115    /// `backfill_splits`.
116    pub init_split_assignment: SplitAssignment,
117    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
118    pub streaming_job: StreamingJob,
119    /// The temporary dummy job fragments id of new table fragment
120    pub tmp_id: u32,
121    /// The state table ids to be dropped.
122    pub to_drop_state_table_ids: Vec<TableId>,
123}
124
125impl ReplaceStreamJobPlan {
126    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
127        let mut fragment_changes = HashMap::new();
128        for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
129            let fragment_change =
130                CommandFragmentChanges::NewFragment(self.streaming_job.id().into(), new_fragment);
131            fragment_changes
132                .try_insert(fragment_id, fragment_change)
133                .expect("non-duplicate");
134        }
135        for fragment in self.old_fragments.fragments.values() {
136            fragment_changes
137                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
138                .expect("non-duplicate");
139        }
140        for (fragment_id, replace_map) in &self.replace_upstream {
141            fragment_changes
142                .try_insert(
143                    *fragment_id,
144                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
145                )
146                .expect("non-duplicate");
147        }
148        fragment_changes
149    }
150
151    /// `old_fragment_id` -> `new_fragment_id`
152    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
153        let mut fragment_replacements = HashMap::new();
154        for (upstream_fragment_id, new_upstream_fragment_id) in
155            self.replace_upstream.values().flatten()
156        {
157            {
158                let r =
159                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
160                if let Some(r) = r {
161                    assert_eq!(
162                        *new_upstream_fragment_id, r,
163                        "one fragment is replaced by multiple fragments"
164                    );
165                }
166            }
167        }
168        fragment_replacements
169    }
170}
171
172#[derive(educe::Educe, Clone)]
173#[educe(Debug)]
174pub struct CreateStreamingJobCommandInfo {
175    #[educe(Debug(ignore))]
176    pub stream_job_fragments: StreamJobFragmentsToCreate,
177    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
178    pub init_split_assignment: SplitAssignment,
179    pub definition: String,
180    pub job_type: StreamingJobType,
181    pub create_type: CreateType,
182    pub streaming_job: StreamingJob,
183    pub internal_tables: Vec<Table>,
184    pub fragment_backfill_ordering: FragmentBackfillOrder,
185}
186
187impl StreamJobFragments {
188    pub(super) fn new_fragment_info(
189        &self,
190    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
191        self.fragments.values().map(|fragment| {
192            (
193                fragment.fragment_id,
194                InflightFragmentInfo {
195                    fragment_id: fragment.fragment_id,
196                    distribution_type: fragment.distribution_type.into(),
197                    nodes: fragment.nodes.clone(),
198                    actors: fragment
199                        .actors
200                        .iter()
201                        .map(|actor| {
202                            (
203                                actor.actor_id,
204                                InflightActorInfo {
205                                    worker_id: self
206                                        .actor_status
207                                        .get(&actor.actor_id)
208                                        .expect("should exist")
209                                        .worker_id()
210                                        as WorkerId,
211                                    vnode_bitmap: actor.vnode_bitmap.clone(),
212                                },
213                            )
214                        })
215                        .collect(),
216                    state_table_ids: fragment
217                        .state_table_ids
218                        .iter()
219                        .map(|table_id| TableId::new(*table_id))
220                        .collect(),
221                },
222            )
223        })
224    }
225}
226
227#[derive(Debug, Clone)]
228pub struct SnapshotBackfillInfo {
229    /// `table_id` -> `Some(snapshot_backfill_epoch)`
230    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
231    /// by global barrier worker when handling the command.
232    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
233}
234
235#[derive(Debug, Clone)]
236pub enum CreateStreamingJobType {
237    Normal,
238    SinkIntoTable(ReplaceStreamJobPlan),
239    SnapshotBackfill(SnapshotBackfillInfo),
240}
241
242/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
243/// it will [build different barriers to send](Self::to_mutation),
244/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
245#[derive(Debug, strum::Display)]
246pub enum Command {
247    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
248    /// all messages before the checkpoint barrier should have been committed.
249    Flush,
250
251    /// `Pause` command generates a `Pause` barrier **only if**
252    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
253    Pause,
254
255    /// `Resume` command generates a `Resume` barrier **only
256    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
257    /// will be generated.
258    Resume,
259
260    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
261    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
262    /// dropped by reference counts before.
263    ///
264    /// Barriers from the actors to be dropped will STILL be collected.
265    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
266    /// drop actors, and then delete the job fragments info from meta store.
267    DropStreamingJobs {
268        table_fragments_ids: HashSet<TableId>,
269        actors: Vec<ActorId>,
270        unregistered_state_table_ids: HashSet<TableId>,
271        unregistered_fragment_ids: HashSet<FragmentId>,
272    },
273
274    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
275    ///
276    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
277    /// be collected since the barrier should be passthrough.
278    ///
279    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
280    /// it adds the job fragments info to meta store. However, the creating progress will **last
281    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
282    /// will be set to `Created`.
283    CreateStreamingJob {
284        info: CreateStreamingJobCommandInfo,
285        job_type: CreateStreamingJobType,
286        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
287    },
288    MergeSnapshotBackfillStreamingJobs(
289        HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
290    ),
291
292    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
293    /// Mainly used for scaling and migration.
294    ///
295    /// Barriers from which actors should be collected, and the post behavior of this command are
296    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
297    RescheduleFragment {
298        reschedules: HashMap<FragmentId, Reschedule>,
299        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
300        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
301        // Used for updating additional metadata after the barrier ends
302        post_updates: JobReschedulePostUpdates,
303    },
304
305    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
306    /// essentially switching the downstream of the old job fragments to the new ones, and
307    /// dropping the old job fragments. Used for schema change.
308    ///
309    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
310    /// of the Merge executors are changed additionally.
311    ReplaceStreamJob(ReplaceStreamJobPlan),
312
313    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
314    /// changed splits.
315    SourceChangeSplit(SplitAssignment),
316
317    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
318    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
319    Throttle(ThrottleConfig),
320
321    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
322    /// materialize executor to start storing old value for subscription.
323    CreateSubscription {
324        subscription_id: u32,
325        upstream_mv_table_id: TableId,
326        retention_second: u64,
327    },
328
329    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
330    /// materialize executor to stop storing old value when there is no
331    /// subscription depending on it.
332    DropSubscription {
333        subscription_id: u32,
334        upstream_mv_table_id: TableId,
335    },
336
337    ConnectorPropsChange(ConnectorPropsChange),
338
339    /// `StartFragmentBackfill` command will trigger backfilling for specified scans by `fragment_id`.
340    StartFragmentBackfill {
341        fragment_ids: Vec<FragmentId>,
342    },
343}
344
345impl Command {
346    pub fn pause() -> Self {
347        Self::Pause
348    }
349
350    pub fn resume() -> Self {
351        Self::Resume
352    }
353
354    pub fn cancel(table_fragments: &StreamJobFragments) -> Self {
355        Self::DropStreamingJobs {
356            table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
357            actors: table_fragments.actor_ids(),
358            unregistered_state_table_ids: table_fragments
359                .all_table_ids()
360                .map(TableId::new)
361                .collect(),
362            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
363        }
364    }
365
366    pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
367        match self {
368            Command::Flush => None,
369            Command::Pause => None,
370            Command::Resume => None,
371            Command::DropStreamingJobs {
372                unregistered_fragment_ids,
373                ..
374            } => Some(
375                unregistered_fragment_ids
376                    .iter()
377                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
378                    .collect(),
379            ),
380            Command::CreateStreamingJob { info, job_type, .. } => {
381                assert!(
382                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
383                    "should handle fragment changes separately for snapshot backfill"
384                );
385                let mut changes: HashMap<_, _> = info
386                    .stream_job_fragments
387                    .new_fragment_info()
388                    .map(|(fragment_id, fragment_info)| {
389                        (
390                            fragment_id,
391                            CommandFragmentChanges::NewFragment(
392                                info.streaming_job.id().into(),
393                                fragment_info,
394                            ),
395                        )
396                    })
397                    .collect();
398
399                if let CreateStreamingJobType::SinkIntoTable(plan) = job_type {
400                    let extra_change = plan.fragment_changes();
401                    changes.extend(extra_change);
402                }
403
404                Some(changes)
405            }
406            Command::RescheduleFragment { reschedules, .. } => Some(
407                reschedules
408                    .iter()
409                    .map(|(fragment_id, reschedule)| {
410                        (
411                            *fragment_id,
412                            CommandFragmentChanges::Reschedule {
413                                new_actors: reschedule
414                                    .added_actors
415                                    .iter()
416                                    .flat_map(|(node_id, actors)| {
417                                        actors.iter().map(|actor_id| {
418                                            (
419                                                *actor_id,
420                                                InflightActorInfo {
421                                                    worker_id: *node_id,
422                                                    vnode_bitmap: reschedule
423                                                        .newly_created_actors
424                                                        .get(actor_id)
425                                                        .expect("should exist")
426                                                        .0
427                                                        .0
428                                                        .vnode_bitmap
429                                                        .clone(),
430                                                },
431                                            )
432                                        })
433                                    })
434                                    .collect(),
435                                actor_update_vnode_bitmap: reschedule
436                                    .vnode_bitmap_updates
437                                    .iter()
438                                    .filter(|(actor_id, _)| {
439                                        // only keep the existing actors
440                                        !reschedule.newly_created_actors.contains_key(actor_id)
441                                    })
442                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
443                                    .collect(),
444                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
445                            },
446                        )
447                    })
448                    .collect(),
449            ),
450            Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
451            Command::MergeSnapshotBackfillStreamingJobs(_) => None,
452            Command::SourceChangeSplit(_) => None,
453            Command::Throttle(_) => None,
454            Command::CreateSubscription { .. } => None,
455            Command::DropSubscription { .. } => None,
456            Command::ConnectorPropsChange(_) => None,
457            Command::StartFragmentBackfill { .. } => None,
458        }
459    }
460
461    pub fn need_checkpoint(&self) -> bool {
462        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
463        !matches!(self, Command::Resume)
464    }
465}
466
467#[derive(Debug, Clone)]
468pub enum BarrierKind {
469    Initial,
470    Barrier,
471    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
472    Checkpoint(Vec<u64>),
473}
474
475impl BarrierKind {
476    pub fn to_protobuf(&self) -> PbBarrierKind {
477        match self {
478            BarrierKind::Initial => PbBarrierKind::Initial,
479            BarrierKind::Barrier => PbBarrierKind::Barrier,
480            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
481        }
482    }
483
484    pub fn is_checkpoint(&self) -> bool {
485        matches!(self, BarrierKind::Checkpoint(_))
486    }
487
488    pub fn is_initial(&self) -> bool {
489        matches!(self, BarrierKind::Initial)
490    }
491
492    pub fn as_str_name(&self) -> &'static str {
493        match self {
494            BarrierKind::Initial => "Initial",
495            BarrierKind::Barrier => "Barrier",
496            BarrierKind::Checkpoint(_) => "Checkpoint",
497        }
498    }
499}
500
501/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
502/// [`Command`].
503pub(super) struct CommandContext {
504    subscription_info: InflightSubscriptionInfo,
505
506    pub(super) barrier_info: BarrierInfo,
507
508    pub(super) table_ids_to_commit: HashSet<TableId>,
509
510    pub(super) command: Option<Command>,
511
512    /// The tracing span of this command.
513    ///
514    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
515    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
516    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
517    _span: tracing::Span,
518}
519
520impl std::fmt::Debug for CommandContext {
521    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
522        f.debug_struct("CommandContext")
523            .field("barrier_info", &self.barrier_info)
524            .field("command", &self.command)
525            .finish()
526    }
527}
528
529impl CommandContext {
530    pub(super) fn new(
531        barrier_info: BarrierInfo,
532        subscription_info: InflightSubscriptionInfo,
533        table_ids_to_commit: HashSet<TableId>,
534        command: Option<Command>,
535        span: tracing::Span,
536    ) -> Self {
537        Self {
538            subscription_info,
539            barrier_info,
540            table_ids_to_commit,
541            command,
542            _span: span,
543        }
544    }
545
546    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
547        let Some(truncate_timestamptz) = Timestamptz::from_secs(
548            self.barrier_info
549                .prev_epoch
550                .value()
551                .as_timestamptz()
552                .timestamp()
553                - retention_second as i64,
554        ) else {
555            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
556            return self.barrier_info.prev_epoch.value();
557        };
558        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
559    }
560
561    pub(super) fn collect_commit_epoch_info(
562        &self,
563        info: &mut CommitEpochInfo,
564        resps: Vec<BarrierCompleteResponse>,
565        backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
566    ) {
567        let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) =
568            collect_resp_info(resps);
569
570        let new_table_fragment_infos =
571            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
572                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
573            {
574                let table_fragments = &info.stream_job_fragments;
575                let mut table_ids: HashSet<_> = table_fragments
576                    .internal_table_ids()
577                    .into_iter()
578                    .map(TableId::new)
579                    .collect();
580                if let Some(mv_table_id) = table_fragments.mv_table_id() {
581                    table_ids.insert(TableId::new(mv_table_id));
582                }
583
584                vec![NewTableFragmentInfo { table_ids }]
585            } else {
586                vec![]
587            };
588
589        let mut mv_log_store_truncate_epoch = HashMap::new();
590        // TODO: may collect cross db snapshot backfill
591        let mut update_truncate_epoch =
592            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
593                .entry(table_id.table_id)
594            {
595                Entry::Occupied(mut entry) => {
596                    let prev_truncate_epoch = entry.get_mut();
597                    if truncate_epoch < *prev_truncate_epoch {
598                        *prev_truncate_epoch = truncate_epoch;
599                    }
600                }
601                Entry::Vacant(entry) => {
602                    entry.insert(truncate_epoch);
603                }
604            };
605        for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
606            if let Some(truncate_epoch) = subscriptions
607                .values()
608                .max()
609                .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
610            {
611                update_truncate_epoch(*mv_table_id, truncate_epoch);
612            }
613        }
614        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
615            for mv_table_id in upstream_mv_table_ids {
616                update_truncate_epoch(mv_table_id, backfill_epoch);
617            }
618        }
619
620        let table_new_change_log = build_table_change_log_delta(
621            old_value_ssts.into_iter(),
622            synced_ssts.iter().map(|sst| &sst.sst_info),
623            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
624            mv_log_store_truncate_epoch.into_iter(),
625        );
626
627        let epoch = self.barrier_info.prev_epoch();
628        for table_id in &self.table_ids_to_commit {
629            info.tables_to_commit
630                .try_insert(*table_id, epoch)
631                .expect("non duplicate");
632        }
633
634        info.sstables.extend(synced_ssts);
635        info.new_table_watermarks.extend(new_table_watermarks);
636        info.sst_to_context.extend(sst_to_context);
637        info.new_table_fragment_infos
638            .extend(new_table_fragment_infos);
639        info.change_log_delta.extend(table_new_change_log);
640    }
641}
642
643impl Command {
644    /// Generate a mutation for the given command.
645    ///
646    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
647    pub(super) fn to_mutation(
648        &self,
649        is_currently_paused: bool,
650        edges: &mut Option<FragmentEdgeBuildResult>,
651        control_stream_manager: &ControlStreamManager,
652    ) -> Option<Mutation> {
653        match self {
654            Command::Flush => None,
655
656            Command::Pause => {
657                // Only pause when the cluster is not already paused.
658                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
659                if !is_currently_paused {
660                    Some(Mutation::Pause(PauseMutation {}))
661                } else {
662                    None
663                }
664            }
665
666            Command::Resume => {
667                // Only resume when the cluster is paused with the same reason.
668                if is_currently_paused {
669                    Some(Mutation::Resume(ResumeMutation {}))
670                } else {
671                    None
672                }
673            }
674
675            Command::SourceChangeSplit(change) => {
676                let mut diff = HashMap::new();
677
678                for actor_splits in change.values() {
679                    diff.extend(actor_splits.clone());
680                }
681
682                Some(Mutation::Splits(SourceChangeSplitMutation {
683                    actor_splits: build_actor_connector_splits(&diff),
684                }))
685            }
686
687            Command::Throttle(config) => {
688                let mut actor_to_apply = HashMap::new();
689                for per_fragment in config.values() {
690                    actor_to_apply.extend(
691                        per_fragment
692                            .iter()
693                            .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
694                    );
695                }
696
697                Some(Mutation::Throttle(ThrottleMutation {
698                    actor_throttle: actor_to_apply,
699                }))
700            }
701
702            Command::DropStreamingJobs { actors, .. } => Some(Mutation::Stop(StopMutation {
703                actors: actors.clone(),
704            })),
705
706            Command::CreateStreamingJob {
707                info:
708                    CreateStreamingJobCommandInfo {
709                        stream_job_fragments: table_fragments,
710                        init_split_assignment: split_assignment,
711                        upstream_fragment_downstreams,
712                        fragment_backfill_ordering,
713                        ..
714                    },
715                job_type,
716                ..
717            } => {
718                let edges = edges.as_mut().expect("should exist");
719                let added_actors = table_fragments.actor_ids();
720                let actor_splits = split_assignment
721                    .values()
722                    .flat_map(build_actor_connector_splits)
723                    .collect();
724                let subscriptions_to_add =
725                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
726                        job_type
727                    {
728                        snapshot_backfill_info
729                            .upstream_mv_table_id_to_backfill_epoch
730                            .keys()
731                            .map(|table_id| SubscriptionUpstreamInfo {
732                                subscriber_id: table_fragments.stream_job_id().table_id,
733                                upstream_mv_table_id: table_id.table_id,
734                            })
735                            .collect()
736                    } else {
737                        Default::default()
738                    };
739                let backfill_nodes_to_pause: Vec<_> =
740                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
741                        .into_iter()
742                        .collect();
743                let add = Some(Mutation::Add(AddMutation {
744                    actor_dispatchers: edges
745                        .dispatchers
746                        .extract_if(|fragment_id, _| {
747                            upstream_fragment_downstreams.contains_key(fragment_id)
748                        })
749                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
750                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
751                        .collect(),
752                    added_actors,
753                    actor_splits,
754                    // If the cluster is already paused, the new actors should be paused too.
755                    pause: is_currently_paused,
756                    subscriptions_to_add,
757                    backfill_nodes_to_pause,
758                }));
759
760                if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
761                    old_fragments,
762                    init_split_assignment,
763                    replace_upstream,
764                    upstream_fragment_downstreams,
765                    ..
766                }) = job_type
767                {
768                    let merge_updates = edges
769                        .merge_updates
770                        .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
771                        .collect();
772                    let dispatchers = edges
773                        .dispatchers
774                        .extract_if(|fragment_id, _| {
775                            upstream_fragment_downstreams.contains_key(fragment_id)
776                        })
777                        .collect();
778                    let update = Self::generate_update_mutation_for_replace_table(
779                        old_fragments,
780                        merge_updates,
781                        dispatchers,
782                        init_split_assignment,
783                    );
784
785                    Some(Mutation::Combined(CombinedMutation {
786                        mutations: vec![
787                            BarrierMutation { mutation: add },
788                            BarrierMutation { mutation: update },
789                        ],
790                    }))
791                } else {
792                    add
793                }
794            }
795            Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
796                Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
797                    info: jobs_to_merge
798                        .iter()
799                        .flat_map(|(table_id, (backfill_upstream_tables, _))| {
800                            backfill_upstream_tables
801                                .iter()
802                                .map(move |upstream_table_id| SubscriptionUpstreamInfo {
803                                    subscriber_id: table_id.table_id,
804                                    upstream_mv_table_id: upstream_table_id.table_id,
805                                })
806                        })
807                        .collect(),
808                }))
809            }
810
811            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
812                old_fragments,
813                replace_upstream,
814                upstream_fragment_downstreams,
815                init_split_assignment,
816                ..
817            }) => {
818                let edges = edges.as_mut().expect("should exist");
819                let merge_updates = edges
820                    .merge_updates
821                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
822                    .collect();
823                let dispatchers = edges
824                    .dispatchers
825                    .extract_if(|fragment_id, _| {
826                        upstream_fragment_downstreams.contains_key(fragment_id)
827                    })
828                    .collect();
829                Self::generate_update_mutation_for_replace_table(
830                    old_fragments,
831                    merge_updates,
832                    dispatchers,
833                    init_split_assignment,
834                )
835            }
836
837            Command::RescheduleFragment {
838                reschedules,
839                fragment_actors,
840                ..
841            } => {
842                let mut dispatcher_update = HashMap::new();
843                for reschedule in reschedules.values() {
844                    for &(upstream_fragment_id, dispatcher_id) in
845                        &reschedule.upstream_fragment_dispatcher_ids
846                    {
847                        // Find the actors of the upstream fragment.
848                        let upstream_actor_ids = fragment_actors
849                            .get(&upstream_fragment_id)
850                            .expect("should contain");
851
852                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
853
854                        // Record updates for all actors.
855                        for &actor_id in upstream_actor_ids {
856                            let added_downstream_actor_id = if upstream_reschedule
857                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
858                                .unwrap_or(true)
859                            {
860                                reschedule
861                                    .added_actors
862                                    .values()
863                                    .flatten()
864                                    .cloned()
865                                    .collect()
866                            } else {
867                                Default::default()
868                            };
869                            // Index with the dispatcher id to check duplicates.
870                            dispatcher_update
871                                .try_insert(
872                                    (actor_id, dispatcher_id),
873                                    DispatcherUpdate {
874                                        actor_id,
875                                        dispatcher_id,
876                                        hash_mapping: reschedule
877                                            .upstream_dispatcher_mapping
878                                            .as_ref()
879                                            .map(|m| m.to_protobuf()),
880                                        added_downstream_actor_id,
881                                        removed_downstream_actor_id: reschedule
882                                            .removed_actors
883                                            .iter()
884                                            .cloned()
885                                            .collect(),
886                                    },
887                                )
888                                .unwrap();
889                        }
890                    }
891                }
892                let dispatcher_update = dispatcher_update.into_values().collect();
893
894                let mut merge_update = HashMap::new();
895                for (&fragment_id, reschedule) in reschedules {
896                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
897                        // Find the actors of the downstream fragment.
898                        let downstream_actor_ids = fragment_actors
899                            .get(&downstream_fragment_id)
900                            .expect("should contain");
901
902                        // Downstream removed actors should be skipped
903                        // Newly created actors of the current fragment will not dispatch Update
904                        // barriers to them
905                        let downstream_removed_actors: HashSet<_> = reschedules
906                            .get(&downstream_fragment_id)
907                            .map(|downstream_reschedule| {
908                                downstream_reschedule
909                                    .removed_actors
910                                    .iter()
911                                    .copied()
912                                    .collect()
913                            })
914                            .unwrap_or_default();
915
916                        // Record updates for all actors.
917                        for &actor_id in downstream_actor_ids {
918                            if downstream_removed_actors.contains(&actor_id) {
919                                continue;
920                            }
921
922                            // Index with the fragment id to check duplicates.
923                            merge_update
924                                .try_insert(
925                                    (actor_id, fragment_id),
926                                    MergeUpdate {
927                                        actor_id,
928                                        upstream_fragment_id: fragment_id,
929                                        new_upstream_fragment_id: None,
930                                        added_upstream_actors: reschedule
931                                            .added_actors
932                                            .iter()
933                                            .flat_map(|(worker_id, actors)| {
934                                                let host =
935                                                    control_stream_manager.host_addr(*worker_id);
936                                                actors.iter().map(move |actor_id| ActorInfo {
937                                                    actor_id: *actor_id,
938                                                    host: Some(host.clone()),
939                                                })
940                                            })
941                                            .collect(),
942                                        removed_upstream_actor_id: reschedule
943                                            .removed_actors
944                                            .iter()
945                                            .cloned()
946                                            .collect(),
947                                    },
948                                )
949                                .unwrap();
950                        }
951                    }
952                }
953                let merge_update = merge_update.into_values().collect();
954
955                let mut actor_vnode_bitmap_update = HashMap::new();
956                for reschedule in reschedules.values() {
957                    // Record updates for all actors in this fragment.
958                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
959                        let bitmap = bitmap.to_protobuf();
960                        actor_vnode_bitmap_update
961                            .try_insert(actor_id, bitmap)
962                            .unwrap();
963                    }
964                }
965                let dropped_actors = reschedules
966                    .values()
967                    .flat_map(|r| r.removed_actors.iter().copied())
968                    .collect();
969                let mut actor_splits = HashMap::new();
970
971                for reschedule in reschedules.values() {
972                    for (actor_id, splits) in &reschedule.actor_splits {
973                        actor_splits.insert(
974                            *actor_id as ActorId,
975                            ConnectorSplits {
976                                splits: splits.iter().map(ConnectorSplit::from).collect(),
977                            },
978                        );
979                    }
980                }
981
982                // we don't create dispatchers in reschedule scenario
983                let actor_new_dispatchers = HashMap::new();
984
985                let mutation = Mutation::Update(UpdateMutation {
986                    dispatcher_update,
987                    merge_update,
988                    actor_vnode_bitmap_update,
989                    dropped_actors,
990                    actor_splits,
991                    actor_new_dispatchers,
992                });
993                tracing::debug!("update mutation: {mutation:?}");
994                Some(mutation)
995            }
996
997            Command::CreateSubscription {
998                upstream_mv_table_id,
999                subscription_id,
1000                ..
1001            } => Some(Mutation::Add(AddMutation {
1002                actor_dispatchers: Default::default(),
1003                added_actors: vec![],
1004                actor_splits: Default::default(),
1005                pause: false,
1006                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1007                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1008                    subscriber_id: *subscription_id,
1009                }],
1010                backfill_nodes_to_pause: vec![],
1011            })),
1012            Command::DropSubscription {
1013                upstream_mv_table_id,
1014                subscription_id,
1015            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1016                info: vec![SubscriptionUpstreamInfo {
1017                    subscriber_id: *subscription_id,
1018                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1019                }],
1020            })),
1021            Command::ConnectorPropsChange(config) => {
1022                let mut connector_props_infos = HashMap::default();
1023                for (k, v) in config {
1024                    connector_props_infos.insert(
1025                        *k,
1026                        ConnectorPropsInfo {
1027                            connector_props_info: v.clone(),
1028                        },
1029                    );
1030                }
1031                Some(Mutation::ConnectorPropsChange(
1032                    ConnectorPropsChangeMutation {
1033                        connector_props_infos,
1034                    },
1035                ))
1036            }
1037            Command::StartFragmentBackfill { fragment_ids } => Some(
1038                Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1039                    fragment_ids: fragment_ids.clone(),
1040                }),
1041            ),
1042        }
1043    }
1044
1045    pub(super) fn actors_to_create(
1046        &self,
1047        graph_info: &InflightDatabaseInfo,
1048        edges: &mut Option<FragmentEdgeBuildResult>,
1049        control_stream_manager: &ControlStreamManager,
1050    ) -> Option<StreamJobActorsToCreate> {
1051        match self {
1052            Command::CreateStreamingJob { info, job_type, .. } => {
1053                let sink_into_table_replace_plan = match job_type {
1054                    CreateStreamingJobType::Normal => None,
1055                    CreateStreamingJobType::SinkIntoTable(replace_table) => Some(replace_table),
1056                    CreateStreamingJobType::SnapshotBackfill(_) => {
1057                        // for snapshot backfill, the actors to create is measured separately
1058                        return None;
1059                    }
1060                };
1061                let get_actors_to_create = || {
1062                    sink_into_table_replace_plan
1063                        .map(|plan| plan.new_fragments.actors_to_create())
1064                        .into_iter()
1065                        .flatten()
1066                        .chain(info.stream_job_fragments.actors_to_create())
1067                };
1068                let edges = edges.as_mut().expect("should exist");
1069                Some(edges.collect_actors_to_create(get_actors_to_create()))
1070            }
1071            Command::RescheduleFragment {
1072                reschedules,
1073                fragment_actors,
1074                ..
1075            } => {
1076                let mut actor_upstreams = Self::collect_actor_upstreams(
1077                    reschedules.iter().map(|(fragment_id, reschedule)| {
1078                        (
1079                            *fragment_id,
1080                            reschedule.newly_created_actors.values().map(
1081                                |((actor, dispatchers), _)| {
1082                                    (actor.actor_id, dispatchers.as_slice())
1083                                },
1084                            ),
1085                        )
1086                    }),
1087                    Some((reschedules, fragment_actors)),
1088                    graph_info,
1089                    control_stream_manager,
1090                );
1091                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1092                for (fragment_id, (actor, dispatchers), worker_id) in
1093                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1094                        reschedule
1095                            .newly_created_actors
1096                            .values()
1097                            .map(|(actors, status)| (*fragment_id, actors, status))
1098                    })
1099                {
1100                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1101                    map.entry(*worker_id)
1102                        .or_default()
1103                        .entry(fragment_id)
1104                        .or_insert_with(|| {
1105                            let node = graph_info.fragment(fragment_id).nodes.clone();
1106                            (node, vec![])
1107                        })
1108                        .1
1109                        .push((actor.clone(), upstreams, dispatchers.clone()));
1110                }
1111                Some(map)
1112            }
1113            Command::ReplaceStreamJob(replace_table) => {
1114                let edges = edges.as_mut().expect("should exist");
1115                Some(edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create()))
1116            }
1117            _ => None,
1118        }
1119    }
1120
1121    fn generate_update_mutation_for_replace_table(
1122        old_fragments: &StreamJobFragments,
1123        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1124        dispatchers: FragmentActorDispatchers,
1125        init_split_assignment: &SplitAssignment,
1126    ) -> Option<Mutation> {
1127        let dropped_actors = old_fragments.actor_ids();
1128
1129        let actor_new_dispatchers = dispatchers
1130            .into_values()
1131            .flatten()
1132            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1133            .collect();
1134
1135        let actor_splits = init_split_assignment
1136            .values()
1137            .flat_map(build_actor_connector_splits)
1138            .collect();
1139
1140        Some(Mutation::Update(UpdateMutation {
1141            actor_new_dispatchers,
1142            merge_update: merge_updates.into_values().flatten().collect(),
1143            dropped_actors,
1144            actor_splits,
1145            ..Default::default()
1146        }))
1147    }
1148
1149    /// For `CancelStreamingJob`, returns the table id of the target table.
1150    pub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1151        match self {
1152            Command::DropStreamingJobs {
1153                table_fragments_ids,
1154                ..
1155            } => Some(table_fragments_ids.iter().cloned()),
1156            _ => None,
1157        }
1158        .into_iter()
1159        .flatten()
1160    }
1161}
1162
1163impl Command {
1164    #[expect(clippy::type_complexity)]
1165    pub(super) fn collect_actor_upstreams(
1166        actor_dispatchers: impl Iterator<
1167            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1168        >,
1169        reschedule_dispatcher_update: Option<(
1170            &HashMap<FragmentId, Reschedule>,
1171            &HashMap<FragmentId, HashSet<ActorId>>,
1172        )>,
1173        graph_info: &InflightDatabaseInfo,
1174        control_stream_manager: &ControlStreamManager,
1175    ) -> HashMap<ActorId, ActorUpstreams> {
1176        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1177        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1178            let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1179            for (upstream_actor_id, dispatchers) in upstream_actors {
1180                let upstream_actor_location =
1181                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1182                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1183                for downstream_actor_id in dispatchers
1184                    .iter()
1185                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1186                {
1187                    actor_upstreams
1188                        .entry(*downstream_actor_id)
1189                        .or_default()
1190                        .entry(upstream_fragment_id)
1191                        .or_default()
1192                        .insert(
1193                            upstream_actor_id,
1194                            ActorInfo {
1195                                actor_id: upstream_actor_id,
1196                                host: Some(upstream_actor_host.clone()),
1197                            },
1198                        );
1199                }
1200            }
1201        }
1202        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1203            for reschedule in reschedules.values() {
1204                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1205                    let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1206                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1207                    for upstream_actor_id in fragment_actors
1208                        .get(upstream_fragment_id)
1209                        .expect("should exist")
1210                    {
1211                        let upstream_actor_location =
1212                            upstream_fragment.actors[upstream_actor_id].worker_id;
1213                        let upstream_actor_host =
1214                            control_stream_manager.host_addr(upstream_actor_location);
1215                        if let Some(upstream_reschedule) = upstream_reschedule
1216                            && upstream_reschedule
1217                                .removed_actors
1218                                .contains(upstream_actor_id)
1219                        {
1220                            continue;
1221                        }
1222                        for (_, downstream_actor_id) in
1223                            reschedule
1224                                .added_actors
1225                                .iter()
1226                                .flat_map(|(worker_id, actors)| {
1227                                    actors.iter().map(|actor| (*worker_id, *actor))
1228                                })
1229                        {
1230                            actor_upstreams
1231                                .entry(downstream_actor_id)
1232                                .or_default()
1233                                .entry(*upstream_fragment_id)
1234                                .or_default()
1235                                .insert(
1236                                    *upstream_actor_id,
1237                                    ActorInfo {
1238                                        actor_id: *upstream_actor_id,
1239                                        host: Some(upstream_actor_host.clone()),
1240                                    },
1241                                );
1242                        }
1243                    }
1244                }
1245            }
1246        }
1247        actor_upstreams
1248    }
1249}