risingwave_meta/barrier/
command.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::ActorMapping;
22use risingwave_common::must_match;
23use risingwave_common::types::Timestamptz;
24use risingwave_common::util::epoch::Epoch;
25use risingwave_connector::source::SplitImpl;
26use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::catalog::{CreateType, Table};
29use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
30use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
31use risingwave_pb::stream_plan::barrier_mutation::Mutation;
32use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
33use risingwave_pb::stream_plan::update_mutation::*;
34use risingwave_pb::stream_plan::{
35    AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers,
36    DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation,
37    StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
38};
39use risingwave_pb::stream_service::BarrierCompleteResponse;
40use tracing::warn;
41
42use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
43use crate::barrier::InflightSubscriptionInfo;
44use crate::barrier::edge_builder::FragmentEdgeBuildResult;
45use crate::barrier::info::BarrierInfo;
46use crate::barrier::utils::collect_resp_info;
47use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
48use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
49use crate::manager::{StreamingJob, StreamingJobType};
50use crate::model::{
51    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
52    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
53    StreamJobFragments, StreamJobFragmentsToCreate,
54};
55use crate::stream::{
56    JobReschedulePostUpdates, SplitAssignment, ThrottleConfig, build_actor_connector_splits,
57};
58
59/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
60/// in some fragment, like scaling or migrating.
61#[derive(Debug, Clone)]
62pub struct Reschedule {
63    /// Added actors in this fragment.
64    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
65
66    /// Removed actors in this fragment.
67    pub removed_actors: Vec<ActorId>,
68
69    /// Vnode bitmap updates for some actors in this fragment.
70    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
71
72    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
73    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
74    /// New hash mapping of the upstream dispatcher to be updated.
75    ///
76    /// This field exists only when there's upstream fragment and the current fragment is
77    /// hash-sharded.
78    pub upstream_dispatcher_mapping: Option<ActorMapping>,
79
80    /// The downstream fragments of this fragment.
81    pub downstream_fragment_ids: Vec<FragmentId>,
82
83    /// Reassigned splits for source actors.
84    /// It becomes the `actor_splits` in [`UpdateMutation`].
85    /// `Source` and `SourceBackfill` are handled together here.
86    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
87
88    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
89}
90
91/// Replacing an old job with a new one. All actors in the job will be rebuilt.
92///
93/// Current use cases:
94/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
95/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
96///   will replace a table job's plan.
97#[derive(Debug, Clone)]
98pub struct ReplaceStreamJobPlan {
99    pub old_fragments: StreamJobFragments,
100    pub new_fragments: StreamJobFragmentsToCreate,
101    /// Downstream jobs of the replaced job need to update their `Merge` node to
102    /// connect to the new fragment.
103    pub replace_upstream: FragmentReplaceUpstream,
104    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
105    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
106    /// We need to reassign splits for it.
107    ///
108    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
109    /// `backfill_splits`.
110    pub init_split_assignment: SplitAssignment,
111    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
112    pub streaming_job: StreamingJob,
113    /// The temporary dummy job fragments id of new table fragment
114    pub tmp_id: u32,
115    /// The state table ids to be dropped.
116    pub to_drop_state_table_ids: Vec<TableId>,
117}
118
119impl ReplaceStreamJobPlan {
120    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
121        let mut fragment_changes = HashMap::new();
122        for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
123            let fragment_change =
124                CommandFragmentChanges::NewFragment(self.streaming_job.id().into(), new_fragment);
125            fragment_changes
126                .try_insert(fragment_id, fragment_change)
127                .expect("non-duplicate");
128        }
129        for fragment in self.old_fragments.fragments.values() {
130            fragment_changes
131                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
132                .expect("non-duplicate");
133        }
134        for (fragment_id, replace_map) in &self.replace_upstream {
135            fragment_changes
136                .try_insert(
137                    *fragment_id,
138                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
139                )
140                .expect("non-duplicate");
141        }
142        fragment_changes
143    }
144
145    /// `old_fragment_id` -> `new_fragment_id`
146    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
147        let mut fragment_replacements = HashMap::new();
148        for (upstream_fragment_id, new_upstream_fragment_id) in
149            self.replace_upstream.values().flatten()
150        {
151            {
152                let r =
153                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
154                if let Some(r) = r {
155                    assert_eq!(
156                        *new_upstream_fragment_id, r,
157                        "one fragment is replaced by multiple fragments"
158                    );
159                }
160            }
161        }
162        fragment_replacements
163    }
164}
165
166#[derive(educe::Educe, Clone)]
167#[educe(Debug)]
168pub struct CreateStreamingJobCommandInfo {
169    #[educe(Debug(ignore))]
170    pub stream_job_fragments: StreamJobFragmentsToCreate,
171    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
172    pub init_split_assignment: SplitAssignment,
173    pub definition: String,
174    pub job_type: StreamingJobType,
175    pub create_type: CreateType,
176    pub streaming_job: StreamingJob,
177    pub internal_tables: Vec<Table>,
178}
179
180impl StreamJobFragments {
181    pub(super) fn new_fragment_info(
182        &self,
183    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
184        self.fragments.values().map(|fragment| {
185            (
186                fragment.fragment_id,
187                InflightFragmentInfo {
188                    fragment_id: fragment.fragment_id,
189                    distribution_type: fragment.distribution_type.into(),
190                    nodes: fragment.nodes.clone(),
191                    actors: fragment
192                        .actors
193                        .iter()
194                        .map(|actor| {
195                            (
196                                actor.actor_id,
197                                InflightActorInfo {
198                                    worker_id: self
199                                        .actor_status
200                                        .get(&actor.actor_id)
201                                        .expect("should exist")
202                                        .worker_id()
203                                        as WorkerId,
204                                    vnode_bitmap: actor.vnode_bitmap.clone(),
205                                },
206                            )
207                        })
208                        .collect(),
209                    state_table_ids: fragment
210                        .state_table_ids
211                        .iter()
212                        .map(|table_id| TableId::new(*table_id))
213                        .collect(),
214                },
215            )
216        })
217    }
218}
219
220#[derive(Debug, Clone)]
221pub struct SnapshotBackfillInfo {
222    /// `table_id` -> `Some(snapshot_backfill_epoch)`
223    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
224    /// by global barrier worker when handling the command.
225    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
226}
227
228#[derive(Debug, Clone)]
229pub enum CreateStreamingJobType {
230    Normal,
231    SinkIntoTable(ReplaceStreamJobPlan),
232    SnapshotBackfill(SnapshotBackfillInfo),
233}
234
235/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
236/// it will [build different barriers to send](Self::to_mutation),
237/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
238#[derive(Debug, strum::Display)]
239pub enum Command {
240    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
241    /// all messages before the checkpoint barrier should have been committed.
242    Flush,
243
244    /// `Pause` command generates a `Pause` barrier **only if**
245    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
246    Pause,
247
248    /// `Resume` command generates a `Resume` barrier **only
249    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
250    /// will be generated.
251    Resume,
252
253    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
254    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
255    /// dropped by reference counts before.
256    ///
257    /// Barriers from the actors to be dropped will STILL be collected.
258    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
259    /// drop actors, and then delete the job fragments info from meta store.
260    DropStreamingJobs {
261        table_fragments_ids: HashSet<TableId>,
262        actors: Vec<ActorId>,
263        unregistered_state_table_ids: HashSet<TableId>,
264        unregistered_fragment_ids: HashSet<FragmentId>,
265    },
266
267    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
268    ///
269    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
270    /// be collected since the barrier should be passthrough.
271    ///
272    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
273    /// it adds the job fragments info to meta store. However, the creating progress will **last
274    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
275    /// will be set to `Created`.
276    CreateStreamingJob {
277        info: CreateStreamingJobCommandInfo,
278        job_type: CreateStreamingJobType,
279        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
280    },
281    MergeSnapshotBackfillStreamingJobs(
282        HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
283    ),
284
285    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
286    /// Mainly used for scaling and migration.
287    ///
288    /// Barriers from which actors should be collected, and the post behavior of this command are
289    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
290    RescheduleFragment {
291        reschedules: HashMap<FragmentId, Reschedule>,
292        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
293        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
294        // Used for updating additional metadata after the barrier ends
295        post_updates: JobReschedulePostUpdates,
296    },
297
298    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
299    /// essentially switching the downstream of the old job fragments to the new ones, and
300    /// dropping the old job fragments. Used for schema change.
301    ///
302    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
303    /// of the Merge executors are changed additionally.
304    ReplaceStreamJob(ReplaceStreamJobPlan),
305
306    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
307    /// changed splits.
308    SourceChangeSplit(SplitAssignment),
309
310    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
311    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
312    Throttle(ThrottleConfig),
313
314    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
315    /// materialize executor to start storing old value for subscription.
316    CreateSubscription {
317        subscription_id: u32,
318        upstream_mv_table_id: TableId,
319        retention_second: u64,
320    },
321
322    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
323    /// materialize executor to stop storing old value when there is no
324    /// subscription depending on it.
325    DropSubscription {
326        subscription_id: u32,
327        upstream_mv_table_id: TableId,
328    },
329}
330
331impl Command {
332    pub fn pause() -> Self {
333        Self::Pause
334    }
335
336    pub fn resume() -> Self {
337        Self::Resume
338    }
339
340    pub fn cancel(table_fragments: &StreamJobFragments) -> Self {
341        Self::DropStreamingJobs {
342            table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
343            actors: table_fragments.actor_ids(),
344            unregistered_state_table_ids: table_fragments
345                .all_table_ids()
346                .map(TableId::new)
347                .collect(),
348            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
349        }
350    }
351
352    pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
353        match self {
354            Command::Flush => None,
355            Command::Pause => None,
356            Command::Resume => None,
357            Command::DropStreamingJobs {
358                unregistered_fragment_ids,
359                ..
360            } => Some(
361                unregistered_fragment_ids
362                    .iter()
363                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
364                    .collect(),
365            ),
366            Command::CreateStreamingJob { info, job_type, .. } => {
367                assert!(
368                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
369                    "should handle fragment changes separately for snapshot backfill"
370                );
371                let mut changes: HashMap<_, _> = info
372                    .stream_job_fragments
373                    .new_fragment_info()
374                    .map(|(fragment_id, fragment_info)| {
375                        (
376                            fragment_id,
377                            CommandFragmentChanges::NewFragment(
378                                info.streaming_job.id().into(),
379                                fragment_info,
380                            ),
381                        )
382                    })
383                    .collect();
384
385                if let CreateStreamingJobType::SinkIntoTable(plan) = job_type {
386                    let extra_change = plan.fragment_changes();
387                    changes.extend(extra_change);
388                }
389
390                Some(changes)
391            }
392            Command::RescheduleFragment { reschedules, .. } => Some(
393                reschedules
394                    .iter()
395                    .map(|(fragment_id, reschedule)| {
396                        (
397                            *fragment_id,
398                            CommandFragmentChanges::Reschedule {
399                                new_actors: reschedule
400                                    .added_actors
401                                    .iter()
402                                    .flat_map(|(node_id, actors)| {
403                                        actors.iter().map(|actor_id| {
404                                            (
405                                                *actor_id,
406                                                InflightActorInfo {
407                                                    worker_id: *node_id,
408                                                    vnode_bitmap: reschedule
409                                                        .newly_created_actors
410                                                        .get(actor_id)
411                                                        .expect("should exist")
412                                                        .0
413                                                        .0
414                                                        .vnode_bitmap
415                                                        .clone(),
416                                                },
417                                            )
418                                        })
419                                    })
420                                    .collect(),
421                                actor_update_vnode_bitmap: reschedule
422                                    .vnode_bitmap_updates
423                                    .iter()
424                                    .filter(|(actor_id, _)| {
425                                        // only keep the existing actors
426                                        !reschedule.newly_created_actors.contains_key(actor_id)
427                                    })
428                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
429                                    .collect(),
430                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
431                            },
432                        )
433                    })
434                    .collect(),
435            ),
436            Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
437            Command::MergeSnapshotBackfillStreamingJobs(_) => None,
438            Command::SourceChangeSplit(_) => None,
439            Command::Throttle(_) => None,
440            Command::CreateSubscription { .. } => None,
441            Command::DropSubscription { .. } => None,
442        }
443    }
444
445    pub fn need_checkpoint(&self) -> bool {
446        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
447        !matches!(self, Command::Resume)
448    }
449}
450
451#[derive(Debug, Clone)]
452pub enum BarrierKind {
453    Initial,
454    Barrier,
455    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
456    Checkpoint(Vec<u64>),
457}
458
459impl BarrierKind {
460    pub fn to_protobuf(&self) -> PbBarrierKind {
461        match self {
462            BarrierKind::Initial => PbBarrierKind::Initial,
463            BarrierKind::Barrier => PbBarrierKind::Barrier,
464            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
465        }
466    }
467
468    pub fn is_checkpoint(&self) -> bool {
469        matches!(self, BarrierKind::Checkpoint(_))
470    }
471
472    pub fn as_str_name(&self) -> &'static str {
473        match self {
474            BarrierKind::Initial => "Initial",
475            BarrierKind::Barrier => "Barrier",
476            BarrierKind::Checkpoint(_) => "Checkpoint",
477        }
478    }
479}
480
481/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
482/// [`Command`].
483pub(super) struct CommandContext {
484    subscription_info: InflightSubscriptionInfo,
485
486    pub(super) barrier_info: BarrierInfo,
487
488    pub(super) table_ids_to_commit: HashSet<TableId>,
489
490    pub(super) command: Option<Command>,
491
492    /// The tracing span of this command.
493    ///
494    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
495    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
496    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
497    _span: tracing::Span,
498}
499
500impl std::fmt::Debug for CommandContext {
501    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
502        f.debug_struct("CommandContext")
503            .field("barrier_info", &self.barrier_info)
504            .field("command", &self.command)
505            .finish()
506    }
507}
508
509impl CommandContext {
510    pub(super) fn new(
511        barrier_info: BarrierInfo,
512        subscription_info: InflightSubscriptionInfo,
513        table_ids_to_commit: HashSet<TableId>,
514        command: Option<Command>,
515        span: tracing::Span,
516    ) -> Self {
517        Self {
518            subscription_info,
519            barrier_info,
520            table_ids_to_commit,
521            command,
522            _span: span,
523        }
524    }
525
526    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
527        let Some(truncate_timestamptz) = Timestamptz::from_secs(
528            self.barrier_info
529                .prev_epoch
530                .value()
531                .as_timestamptz()
532                .timestamp()
533                - retention_second as i64,
534        ) else {
535            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
536            return self.barrier_info.prev_epoch.value();
537        };
538        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
539    }
540
541    pub(super) fn collect_commit_epoch_info(
542        &self,
543        info: &mut CommitEpochInfo,
544        resps: Vec<BarrierCompleteResponse>,
545        backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
546    ) {
547        let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) =
548            collect_resp_info(resps);
549
550        let new_table_fragment_infos =
551            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
552                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
553            {
554                let table_fragments = &info.stream_job_fragments;
555                let mut table_ids: HashSet<_> = table_fragments
556                    .internal_table_ids()
557                    .into_iter()
558                    .map(TableId::new)
559                    .collect();
560                if let Some(mv_table_id) = table_fragments.mv_table_id() {
561                    table_ids.insert(TableId::new(mv_table_id));
562                }
563
564                vec![NewTableFragmentInfo { table_ids }]
565            } else {
566                vec![]
567            };
568
569        let mut mv_log_store_truncate_epoch = HashMap::new();
570        // TODO: may collect cross db snapshot backfill
571        let mut update_truncate_epoch =
572            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
573                .entry(table_id.table_id)
574            {
575                Entry::Occupied(mut entry) => {
576                    let prev_truncate_epoch = entry.get_mut();
577                    if truncate_epoch < *prev_truncate_epoch {
578                        *prev_truncate_epoch = truncate_epoch;
579                    }
580                }
581                Entry::Vacant(entry) => {
582                    entry.insert(truncate_epoch);
583                }
584            };
585        for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
586            if let Some(truncate_epoch) = subscriptions
587                .values()
588                .max()
589                .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
590            {
591                update_truncate_epoch(*mv_table_id, truncate_epoch);
592            }
593        }
594        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
595            for mv_table_id in upstream_mv_table_ids {
596                update_truncate_epoch(mv_table_id, backfill_epoch);
597            }
598        }
599
600        let table_new_change_log = build_table_change_log_delta(
601            old_value_ssts.into_iter(),
602            synced_ssts.iter().map(|sst| &sst.sst_info),
603            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
604            mv_log_store_truncate_epoch.into_iter(),
605        );
606
607        let epoch = self.barrier_info.prev_epoch();
608        for table_id in &self.table_ids_to_commit {
609            info.tables_to_commit
610                .try_insert(*table_id, epoch)
611                .expect("non duplicate");
612        }
613
614        info.sstables.extend(synced_ssts);
615        info.new_table_watermarks.extend(new_table_watermarks);
616        info.sst_to_context.extend(sst_to_context);
617        info.new_table_fragment_infos
618            .extend(new_table_fragment_infos);
619        info.change_log_delta.extend(table_new_change_log);
620    }
621}
622
623impl Command {
624    /// Generate a mutation for the given command.
625    ///
626    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
627    pub(super) fn to_mutation(
628        &self,
629        is_currently_paused: bool,
630        edges: &mut Option<FragmentEdgeBuildResult>,
631    ) -> Option<Mutation> {
632        match self {
633            Command::Flush => None,
634
635            Command::Pause => {
636                // Only pause when the cluster is not already paused.
637                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
638                if !is_currently_paused {
639                    Some(Mutation::Pause(PauseMutation {}))
640                } else {
641                    None
642                }
643            }
644
645            Command::Resume => {
646                // Only resume when the cluster is paused with the same reason.
647                if is_currently_paused {
648                    Some(Mutation::Resume(ResumeMutation {}))
649                } else {
650                    None
651                }
652            }
653
654            Command::SourceChangeSplit(change) => {
655                let mut diff = HashMap::new();
656
657                for actor_splits in change.values() {
658                    diff.extend(actor_splits.clone());
659                }
660
661                Some(Mutation::Splits(SourceChangeSplitMutation {
662                    actor_splits: build_actor_connector_splits(&diff),
663                }))
664            }
665
666            Command::Throttle(config) => {
667                let mut actor_to_apply = HashMap::new();
668                for per_fragment in config.values() {
669                    actor_to_apply.extend(
670                        per_fragment
671                            .iter()
672                            .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
673                    );
674                }
675
676                Some(Mutation::Throttle(ThrottleMutation {
677                    actor_throttle: actor_to_apply,
678                }))
679            }
680
681            Command::DropStreamingJobs { actors, .. } => Some(Mutation::Stop(StopMutation {
682                actors: actors.clone(),
683            })),
684
685            Command::CreateStreamingJob {
686                info:
687                    CreateStreamingJobCommandInfo {
688                        stream_job_fragments: table_fragments,
689                        init_split_assignment: split_assignment,
690                        upstream_fragment_downstreams,
691                        ..
692                    },
693                job_type,
694                ..
695            } => {
696                let edges = edges.as_mut().expect("should exist");
697                let added_actors = table_fragments.actor_ids();
698                let actor_splits = split_assignment
699                    .values()
700                    .flat_map(build_actor_connector_splits)
701                    .collect();
702                let subscriptions_to_add =
703                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
704                        job_type
705                    {
706                        snapshot_backfill_info
707                            .upstream_mv_table_id_to_backfill_epoch
708                            .keys()
709                            .map(|table_id| SubscriptionUpstreamInfo {
710                                subscriber_id: table_fragments.stream_job_id().table_id,
711                                upstream_mv_table_id: table_id.table_id,
712                            })
713                            .collect()
714                    } else {
715                        Default::default()
716                    };
717                let add = Some(Mutation::Add(AddMutation {
718                    actor_dispatchers: edges
719                        .dispatchers
720                        .extract_if(|fragment_id, _| {
721                            upstream_fragment_downstreams.contains_key(fragment_id)
722                        })
723                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
724                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
725                        .collect(),
726                    added_actors,
727                    actor_splits,
728                    // If the cluster is already paused, the new actors should be paused too.
729                    pause: is_currently_paused,
730                    subscriptions_to_add,
731                }));
732
733                if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
734                    old_fragments,
735                    init_split_assignment,
736                    replace_upstream,
737                    upstream_fragment_downstreams,
738                    ..
739                }) = job_type
740                {
741                    let merge_updates = edges
742                        .merge_updates
743                        .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
744                        .collect();
745                    let dispatchers = edges
746                        .dispatchers
747                        .extract_if(|fragment_id, _| {
748                            upstream_fragment_downstreams.contains_key(fragment_id)
749                        })
750                        .collect();
751                    let update = Self::generate_update_mutation_for_replace_table(
752                        old_fragments,
753                        merge_updates,
754                        dispatchers,
755                        init_split_assignment,
756                    );
757
758                    Some(Mutation::Combined(CombinedMutation {
759                        mutations: vec![
760                            BarrierMutation { mutation: add },
761                            BarrierMutation { mutation: update },
762                        ],
763                    }))
764                } else {
765                    add
766                }
767            }
768            Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
769                Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
770                    info: jobs_to_merge
771                        .iter()
772                        .flat_map(|(table_id, (backfill_upstream_tables, _))| {
773                            backfill_upstream_tables
774                                .iter()
775                                .map(move |upstream_table_id| SubscriptionUpstreamInfo {
776                                    subscriber_id: table_id.table_id,
777                                    upstream_mv_table_id: upstream_table_id.table_id,
778                                })
779                        })
780                        .collect(),
781                }))
782            }
783
784            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
785                old_fragments,
786                replace_upstream,
787                upstream_fragment_downstreams,
788                init_split_assignment,
789                ..
790            }) => {
791                let edges = edges.as_mut().expect("should exist");
792                let merge_updates = edges
793                    .merge_updates
794                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
795                    .collect();
796                let dispatchers = edges
797                    .dispatchers
798                    .extract_if(|fragment_id, _| {
799                        upstream_fragment_downstreams.contains_key(fragment_id)
800                    })
801                    .collect();
802                Self::generate_update_mutation_for_replace_table(
803                    old_fragments,
804                    merge_updates,
805                    dispatchers,
806                    init_split_assignment,
807                )
808            }
809
810            Command::RescheduleFragment {
811                reschedules,
812                fragment_actors,
813                ..
814            } => {
815                let mut dispatcher_update = HashMap::new();
816                for reschedule in reschedules.values() {
817                    for &(upstream_fragment_id, dispatcher_id) in
818                        &reschedule.upstream_fragment_dispatcher_ids
819                    {
820                        // Find the actors of the upstream fragment.
821                        let upstream_actor_ids = fragment_actors
822                            .get(&upstream_fragment_id)
823                            .expect("should contain");
824
825                        // Record updates for all actors.
826                        for &actor_id in upstream_actor_ids {
827                            // Index with the dispatcher id to check duplicates.
828                            dispatcher_update
829                                .try_insert(
830                                    (actor_id, dispatcher_id),
831                                    DispatcherUpdate {
832                                        actor_id,
833                                        dispatcher_id,
834                                        hash_mapping: reschedule
835                                            .upstream_dispatcher_mapping
836                                            .as_ref()
837                                            .map(|m| m.to_protobuf()),
838                                        added_downstream_actor_id: reschedule
839                                            .added_actors
840                                            .values()
841                                            .flatten()
842                                            .cloned()
843                                            .collect(),
844                                        removed_downstream_actor_id: reschedule
845                                            .removed_actors
846                                            .clone(),
847                                    },
848                                )
849                                .unwrap();
850                        }
851                    }
852                }
853                let dispatcher_update = dispatcher_update.into_values().collect();
854
855                let mut merge_update = HashMap::new();
856                for (&fragment_id, reschedule) in reschedules {
857                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
858                        // Find the actors of the downstream fragment.
859                        let downstream_actor_ids = fragment_actors
860                            .get(&downstream_fragment_id)
861                            .expect("should contain");
862
863                        // Downstream removed actors should be skipped
864                        // Newly created actors of the current fragment will not dispatch Update
865                        // barriers to them
866                        let downstream_removed_actors: HashSet<_> = reschedules
867                            .get(&downstream_fragment_id)
868                            .map(|downstream_reschedule| {
869                                downstream_reschedule
870                                    .removed_actors
871                                    .iter()
872                                    .copied()
873                                    .collect()
874                            })
875                            .unwrap_or_default();
876
877                        // Record updates for all actors.
878                        for &actor_id in downstream_actor_ids {
879                            if downstream_removed_actors.contains(&actor_id) {
880                                continue;
881                            }
882
883                            // Index with the fragment id to check duplicates.
884                            merge_update
885                                .try_insert(
886                                    (actor_id, fragment_id),
887                                    MergeUpdate {
888                                        actor_id,
889                                        upstream_fragment_id: fragment_id,
890                                        new_upstream_fragment_id: None,
891                                        added_upstream_actor_id: reschedule
892                                            .added_actors
893                                            .values()
894                                            .flatten()
895                                            .cloned()
896                                            .collect(),
897                                        removed_upstream_actor_id: reschedule
898                                            .removed_actors
899                                            .clone(),
900                                    },
901                                )
902                                .unwrap();
903                        }
904                    }
905                }
906                let merge_update = merge_update.into_values().collect();
907
908                let mut actor_vnode_bitmap_update = HashMap::new();
909                for reschedule in reschedules.values() {
910                    // Record updates for all actors in this fragment.
911                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
912                        let bitmap = bitmap.to_protobuf();
913                        actor_vnode_bitmap_update
914                            .try_insert(actor_id, bitmap)
915                            .unwrap();
916                    }
917                }
918
919                let dropped_actors = reschedules
920                    .values()
921                    .flat_map(|r| r.removed_actors.iter().copied())
922                    .collect();
923
924                let mut actor_splits = HashMap::new();
925
926                for reschedule in reschedules.values() {
927                    for (actor_id, splits) in &reschedule.actor_splits {
928                        actor_splits.insert(
929                            *actor_id as ActorId,
930                            ConnectorSplits {
931                                splits: splits.iter().map(ConnectorSplit::from).collect(),
932                            },
933                        );
934                    }
935                }
936
937                // we don't create dispatchers in reschedule scenario
938                let actor_new_dispatchers = HashMap::new();
939
940                let mutation = Mutation::Update(UpdateMutation {
941                    dispatcher_update,
942                    merge_update,
943                    actor_vnode_bitmap_update,
944                    dropped_actors,
945                    actor_splits,
946                    actor_new_dispatchers,
947                });
948                tracing::debug!("update mutation: {mutation:?}");
949                Some(mutation)
950            }
951
952            Command::CreateSubscription {
953                upstream_mv_table_id,
954                subscription_id,
955                ..
956            } => Some(Mutation::Add(AddMutation {
957                actor_dispatchers: Default::default(),
958                added_actors: vec![],
959                actor_splits: Default::default(),
960                pause: false,
961                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
962                    upstream_mv_table_id: upstream_mv_table_id.table_id,
963                    subscriber_id: *subscription_id,
964                }],
965            })),
966            Command::DropSubscription {
967                upstream_mv_table_id,
968                subscription_id,
969            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
970                info: vec![SubscriptionUpstreamInfo {
971                    subscriber_id: *subscription_id,
972                    upstream_mv_table_id: upstream_mv_table_id.table_id,
973                }],
974            })),
975        }
976    }
977
978    pub(super) fn actors_to_create(
979        &self,
980        graph_info: &InflightDatabaseInfo,
981        edges: &mut Option<FragmentEdgeBuildResult>,
982    ) -> Option<StreamJobActorsToCreate> {
983        match self {
984            Command::CreateStreamingJob { info, job_type, .. } => {
985                let sink_into_table_replace_plan = match job_type {
986                    CreateStreamingJobType::Normal => None,
987                    CreateStreamingJobType::SinkIntoTable(replace_table) => Some(replace_table),
988                    CreateStreamingJobType::SnapshotBackfill(_) => {
989                        // for snapshot backfill, the actors to create is measured separately
990                        return None;
991                    }
992                };
993                let get_actors_to_create = || {
994                    sink_into_table_replace_plan
995                        .map(|plan| plan.new_fragments.actors_to_create())
996                        .into_iter()
997                        .flatten()
998                        .chain(info.stream_job_fragments.actors_to_create())
999                };
1000                let edges = edges.as_mut().expect("should exist");
1001                Some(edges.collect_actors_to_create(get_actors_to_create()))
1002            }
1003            Command::RescheduleFragment {
1004                reschedules,
1005                fragment_actors,
1006                ..
1007            } => {
1008                let mut actor_upstreams = Self::collect_actor_upstreams(
1009                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1010                        reschedule
1011                            .newly_created_actors
1012                            .values()
1013                            .map(|((actor, dispatchers), _)| {
1014                                (actor.actor_id, *fragment_id, dispatchers.as_slice())
1015                            })
1016                    }),
1017                    Some((reschedules, fragment_actors)),
1018                );
1019                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1020                for (fragment_id, (actor, dispatchers), worker_id) in
1021                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1022                        reschedule
1023                            .newly_created_actors
1024                            .values()
1025                            .map(|(actors, status)| (*fragment_id, actors, status))
1026                    })
1027                {
1028                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1029                    map.entry(*worker_id)
1030                        .or_default()
1031                        .entry(fragment_id)
1032                        .or_insert_with(|| {
1033                            let node = graph_info.fragment(fragment_id).nodes.clone();
1034                            (node, vec![])
1035                        })
1036                        .1
1037                        .push((actor.clone(), upstreams, dispatchers.clone()));
1038                }
1039                Some(map)
1040            }
1041            Command::ReplaceStreamJob(replace_table) => {
1042                let edges = edges.as_mut().expect("should exist");
1043                Some(edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create()))
1044            }
1045            _ => None,
1046        }
1047    }
1048
1049    fn generate_update_mutation_for_replace_table(
1050        old_fragments: &StreamJobFragments,
1051        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1052        dispatchers: FragmentActorDispatchers,
1053        init_split_assignment: &SplitAssignment,
1054    ) -> Option<Mutation> {
1055        let dropped_actors = old_fragments.actor_ids();
1056
1057        let actor_new_dispatchers = dispatchers
1058            .into_values()
1059            .flatten()
1060            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1061            .collect();
1062
1063        let actor_splits = init_split_assignment
1064            .values()
1065            .flat_map(build_actor_connector_splits)
1066            .collect();
1067
1068        Some(Mutation::Update(UpdateMutation {
1069            actor_new_dispatchers,
1070            merge_update: merge_updates.into_values().flatten().collect(),
1071            dropped_actors,
1072            actor_splits,
1073            ..Default::default()
1074        }))
1075    }
1076
1077    /// For `CancelStreamingJob`, returns the table id of the target table.
1078    pub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1079        match self {
1080            Command::DropStreamingJobs {
1081                table_fragments_ids,
1082                ..
1083            } => Some(table_fragments_ids.iter().cloned()),
1084            _ => None,
1085        }
1086        .into_iter()
1087        .flatten()
1088    }
1089}
1090
1091impl Command {
1092    #[expect(clippy::type_complexity)]
1093    pub fn collect_actor_upstreams(
1094        actor_dispatchers: impl Iterator<Item = (ActorId, FragmentId, &[Dispatcher])>,
1095        reschedule_dispatcher_update: Option<(
1096            &HashMap<FragmentId, Reschedule>,
1097            &HashMap<FragmentId, HashSet<ActorId>>,
1098        )>,
1099    ) -> HashMap<ActorId, ActorUpstreams> {
1100        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1101        for (upstream_actor_id, upstream_fragment_id, dispatchers) in actor_dispatchers {
1102            for downstream_actor_id in dispatchers
1103                .iter()
1104                .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1105            {
1106                actor_upstreams
1107                    .entry(*downstream_actor_id)
1108                    .or_default()
1109                    .entry(upstream_fragment_id)
1110                    .or_default()
1111                    .insert(upstream_actor_id);
1112            }
1113        }
1114        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1115            for reschedule in reschedules.values() {
1116                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1117                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1118                    for upstream_actor_id in fragment_actors
1119                        .get(upstream_fragment_id)
1120                        .expect("should exist")
1121                    {
1122                        if let Some(upstream_reschedule) = upstream_reschedule
1123                            && upstream_reschedule
1124                                .removed_actors
1125                                .contains(upstream_actor_id)
1126                        {
1127                            continue;
1128                        }
1129                        for downstream_actor_id in reschedule.added_actors.values().flatten() {
1130                            actor_upstreams
1131                                .entry(*downstream_actor_id)
1132                                .or_default()
1133                                .entry(*upstream_fragment_id)
1134                                .or_default()
1135                                .insert(*upstream_actor_id);
1136                        }
1137                    }
1138                }
1139            }
1140        }
1141        actor_upstreams
1142    }
1143}