risingwave_meta/barrier/
command.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::ActorMapping;
22use risingwave_common::must_match;
23use risingwave_common::types::Timestamptz;
24use risingwave_common::util::epoch::Epoch;
25use risingwave_connector::source::SplitImpl;
26use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::catalog::{CreateType, Table};
29use risingwave_pb::common::ActorInfo;
30use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
31use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
32use risingwave_pb::stream_plan::barrier_mutation::Mutation;
33use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
34use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
35use risingwave_pb::stream_plan::update_mutation::*;
36use risingwave_pb::stream_plan::{
37    AddMutation, BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatcher,
38    Dispatchers, DropSubscriptionsMutation, PauseMutation, ResumeMutation,
39    SourceChangeSplitMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation,
40    UpdateMutation,
41};
42use risingwave_pb::stream_service::BarrierCompleteResponse;
43use tracing::warn;
44
45use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
46use crate::barrier::InflightSubscriptionInfo;
47use crate::barrier::edge_builder::FragmentEdgeBuildResult;
48use crate::barrier::info::BarrierInfo;
49use crate::barrier::rpc::ControlStreamManager;
50use crate::barrier::utils::collect_resp_info;
51use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
52use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
53use crate::manager::{StreamingJob, StreamingJobType};
54use crate::model::{
55    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
56    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
57    StreamJobFragments, StreamJobFragmentsToCreate,
58};
59use crate::stream::{
60    ConnectorPropsChange, JobReschedulePostUpdates, SplitAssignment, ThrottleConfig,
61    build_actor_connector_splits,
62};
63
64/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
65/// in some fragment, like scaling or migrating.
66#[derive(Debug, Clone)]
67pub struct Reschedule {
68    /// Added actors in this fragment.
69    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
70
71    /// Removed actors in this fragment.
72    pub removed_actors: HashSet<ActorId>,
73
74    /// Vnode bitmap updates for some actors in this fragment.
75    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
76
77    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
78    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
79    /// New hash mapping of the upstream dispatcher to be updated.
80    ///
81    /// This field exists only when there's upstream fragment and the current fragment is
82    /// hash-sharded.
83    pub upstream_dispatcher_mapping: Option<ActorMapping>,
84
85    /// The downstream fragments of this fragment.
86    pub downstream_fragment_ids: Vec<FragmentId>,
87
88    /// Reassigned splits for source actors.
89    /// It becomes the `actor_splits` in [`UpdateMutation`].
90    /// `Source` and `SourceBackfill` are handled together here.
91    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
92
93    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
94}
95
96/// Replacing an old job with a new one. All actors in the job will be rebuilt.
97///
98/// Current use cases:
99/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
100/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
101///   will replace a table job's plan.
102#[derive(Debug, Clone)]
103pub struct ReplaceStreamJobPlan {
104    pub old_fragments: StreamJobFragments,
105    pub new_fragments: StreamJobFragmentsToCreate,
106    /// Downstream jobs of the replaced job need to update their `Merge` node to
107    /// connect to the new fragment.
108    pub replace_upstream: FragmentReplaceUpstream,
109    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
110    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
111    /// We need to reassign splits for it.
112    ///
113    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
114    /// `backfill_splits`.
115    pub init_split_assignment: SplitAssignment,
116    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
117    pub streaming_job: StreamingJob,
118    /// The temporary dummy job fragments id of new table fragment
119    pub tmp_id: u32,
120    /// The state table ids to be dropped.
121    pub to_drop_state_table_ids: Vec<TableId>,
122}
123
124impl ReplaceStreamJobPlan {
125    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
126        let mut fragment_changes = HashMap::new();
127        for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
128            let fragment_change =
129                CommandFragmentChanges::NewFragment(self.streaming_job.id().into(), new_fragment);
130            fragment_changes
131                .try_insert(fragment_id, fragment_change)
132                .expect("non-duplicate");
133        }
134        for fragment in self.old_fragments.fragments.values() {
135            fragment_changes
136                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
137                .expect("non-duplicate");
138        }
139        for (fragment_id, replace_map) in &self.replace_upstream {
140            fragment_changes
141                .try_insert(
142                    *fragment_id,
143                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
144                )
145                .expect("non-duplicate");
146        }
147        fragment_changes
148    }
149
150    /// `old_fragment_id` -> `new_fragment_id`
151    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
152        let mut fragment_replacements = HashMap::new();
153        for (upstream_fragment_id, new_upstream_fragment_id) in
154            self.replace_upstream.values().flatten()
155        {
156            {
157                let r =
158                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
159                if let Some(r) = r {
160                    assert_eq!(
161                        *new_upstream_fragment_id, r,
162                        "one fragment is replaced by multiple fragments"
163                    );
164                }
165            }
166        }
167        fragment_replacements
168    }
169}
170
171#[derive(educe::Educe, Clone)]
172#[educe(Debug)]
173pub struct CreateStreamingJobCommandInfo {
174    #[educe(Debug(ignore))]
175    pub stream_job_fragments: StreamJobFragmentsToCreate,
176    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
177    pub init_split_assignment: SplitAssignment,
178    pub definition: String,
179    pub job_type: StreamingJobType,
180    pub create_type: CreateType,
181    pub streaming_job: StreamingJob,
182    pub internal_tables: Vec<Table>,
183}
184
185impl StreamJobFragments {
186    pub(super) fn new_fragment_info(
187        &self,
188    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
189        self.fragments.values().map(|fragment| {
190            (
191                fragment.fragment_id,
192                InflightFragmentInfo {
193                    fragment_id: fragment.fragment_id,
194                    distribution_type: fragment.distribution_type.into(),
195                    nodes: fragment.nodes.clone(),
196                    actors: fragment
197                        .actors
198                        .iter()
199                        .map(|actor| {
200                            (
201                                actor.actor_id,
202                                InflightActorInfo {
203                                    worker_id: self
204                                        .actor_status
205                                        .get(&actor.actor_id)
206                                        .expect("should exist")
207                                        .worker_id()
208                                        as WorkerId,
209                                    vnode_bitmap: actor.vnode_bitmap.clone(),
210                                },
211                            )
212                        })
213                        .collect(),
214                    state_table_ids: fragment
215                        .state_table_ids
216                        .iter()
217                        .map(|table_id| TableId::new(*table_id))
218                        .collect(),
219                },
220            )
221        })
222    }
223}
224
225#[derive(Debug, Clone)]
226pub struct SnapshotBackfillInfo {
227    /// `table_id` -> `Some(snapshot_backfill_epoch)`
228    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
229    /// by global barrier worker when handling the command.
230    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
231}
232
233#[derive(Debug, Clone)]
234pub enum CreateStreamingJobType {
235    Normal,
236    SinkIntoTable(ReplaceStreamJobPlan),
237    SnapshotBackfill(SnapshotBackfillInfo),
238}
239
240/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
241/// it will [build different barriers to send](Self::to_mutation),
242/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
243#[derive(Debug, strum::Display)]
244pub enum Command {
245    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
246    /// all messages before the checkpoint barrier should have been committed.
247    Flush,
248
249    /// `Pause` command generates a `Pause` barrier **only if**
250    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
251    Pause,
252
253    /// `Resume` command generates a `Resume` barrier **only
254    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
255    /// will be generated.
256    Resume,
257
258    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
259    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
260    /// dropped by reference counts before.
261    ///
262    /// Barriers from the actors to be dropped will STILL be collected.
263    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
264    /// drop actors, and then delete the job fragments info from meta store.
265    DropStreamingJobs {
266        table_fragments_ids: HashSet<TableId>,
267        actors: Vec<ActorId>,
268        unregistered_state_table_ids: HashSet<TableId>,
269        unregistered_fragment_ids: HashSet<FragmentId>,
270    },
271
272    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
273    ///
274    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
275    /// be collected since the barrier should be passthrough.
276    ///
277    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
278    /// it adds the job fragments info to meta store. However, the creating progress will **last
279    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
280    /// will be set to `Created`.
281    CreateStreamingJob {
282        info: CreateStreamingJobCommandInfo,
283        job_type: CreateStreamingJobType,
284        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
285    },
286    MergeSnapshotBackfillStreamingJobs(
287        HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
288    ),
289
290    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
291    /// Mainly used for scaling and migration.
292    ///
293    /// Barriers from which actors should be collected, and the post behavior of this command are
294    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
295    RescheduleFragment {
296        reschedules: HashMap<FragmentId, Reschedule>,
297        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
298        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
299        // Used for updating additional metadata after the barrier ends
300        post_updates: JobReschedulePostUpdates,
301    },
302
303    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
304    /// essentially switching the downstream of the old job fragments to the new ones, and
305    /// dropping the old job fragments. Used for schema change.
306    ///
307    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
308    /// of the Merge executors are changed additionally.
309    ReplaceStreamJob(ReplaceStreamJobPlan),
310
311    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
312    /// changed splits.
313    SourceChangeSplit(SplitAssignment),
314
315    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
316    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
317    Throttle(ThrottleConfig),
318
319    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
320    /// materialize executor to start storing old value for subscription.
321    CreateSubscription {
322        subscription_id: u32,
323        upstream_mv_table_id: TableId,
324        retention_second: u64,
325    },
326
327    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
328    /// materialize executor to stop storing old value when there is no
329    /// subscription depending on it.
330    DropSubscription {
331        subscription_id: u32,
332        upstream_mv_table_id: TableId,
333    },
334
335    ConnectorPropsChange(ConnectorPropsChange),
336}
337
338impl Command {
339    pub fn pause() -> Self {
340        Self::Pause
341    }
342
343    pub fn resume() -> Self {
344        Self::Resume
345    }
346
347    pub fn cancel(table_fragments: &StreamJobFragments) -> Self {
348        Self::DropStreamingJobs {
349            table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
350            actors: table_fragments.actor_ids(),
351            unregistered_state_table_ids: table_fragments
352                .all_table_ids()
353                .map(TableId::new)
354                .collect(),
355            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
356        }
357    }
358
359    pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
360        match self {
361            Command::Flush => None,
362            Command::Pause => None,
363            Command::Resume => None,
364            Command::DropStreamingJobs {
365                unregistered_fragment_ids,
366                ..
367            } => Some(
368                unregistered_fragment_ids
369                    .iter()
370                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
371                    .collect(),
372            ),
373            Command::CreateStreamingJob { info, job_type, .. } => {
374                assert!(
375                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
376                    "should handle fragment changes separately for snapshot backfill"
377                );
378                let mut changes: HashMap<_, _> = info
379                    .stream_job_fragments
380                    .new_fragment_info()
381                    .map(|(fragment_id, fragment_info)| {
382                        (
383                            fragment_id,
384                            CommandFragmentChanges::NewFragment(
385                                info.streaming_job.id().into(),
386                                fragment_info,
387                            ),
388                        )
389                    })
390                    .collect();
391
392                if let CreateStreamingJobType::SinkIntoTable(plan) = job_type {
393                    let extra_change = plan.fragment_changes();
394                    changes.extend(extra_change);
395                }
396
397                Some(changes)
398            }
399            Command::RescheduleFragment { reschedules, .. } => Some(
400                reschedules
401                    .iter()
402                    .map(|(fragment_id, reschedule)| {
403                        (
404                            *fragment_id,
405                            CommandFragmentChanges::Reschedule {
406                                new_actors: reschedule
407                                    .added_actors
408                                    .iter()
409                                    .flat_map(|(node_id, actors)| {
410                                        actors.iter().map(|actor_id| {
411                                            (
412                                                *actor_id,
413                                                InflightActorInfo {
414                                                    worker_id: *node_id,
415                                                    vnode_bitmap: reschedule
416                                                        .newly_created_actors
417                                                        .get(actor_id)
418                                                        .expect("should exist")
419                                                        .0
420                                                        .0
421                                                        .vnode_bitmap
422                                                        .clone(),
423                                                },
424                                            )
425                                        })
426                                    })
427                                    .collect(),
428                                actor_update_vnode_bitmap: reschedule
429                                    .vnode_bitmap_updates
430                                    .iter()
431                                    .filter(|(actor_id, _)| {
432                                        // only keep the existing actors
433                                        !reschedule.newly_created_actors.contains_key(actor_id)
434                                    })
435                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
436                                    .collect(),
437                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
438                            },
439                        )
440                    })
441                    .collect(),
442            ),
443            Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
444            Command::MergeSnapshotBackfillStreamingJobs(_) => None,
445            Command::SourceChangeSplit(_) => None,
446            Command::Throttle(_) => None,
447            Command::CreateSubscription { .. } => None,
448            Command::DropSubscription { .. } => None,
449            Command::ConnectorPropsChange(_) => None,
450        }
451    }
452
453    pub fn need_checkpoint(&self) -> bool {
454        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
455        !matches!(self, Command::Resume)
456    }
457}
458
459#[derive(Debug, Clone)]
460pub enum BarrierKind {
461    Initial,
462    Barrier,
463    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
464    Checkpoint(Vec<u64>),
465}
466
467impl BarrierKind {
468    pub fn to_protobuf(&self) -> PbBarrierKind {
469        match self {
470            BarrierKind::Initial => PbBarrierKind::Initial,
471            BarrierKind::Barrier => PbBarrierKind::Barrier,
472            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
473        }
474    }
475
476    pub fn is_checkpoint(&self) -> bool {
477        matches!(self, BarrierKind::Checkpoint(_))
478    }
479
480    pub fn is_initial(&self) -> bool {
481        matches!(self, BarrierKind::Initial)
482    }
483
484    pub fn as_str_name(&self) -> &'static str {
485        match self {
486            BarrierKind::Initial => "Initial",
487            BarrierKind::Barrier => "Barrier",
488            BarrierKind::Checkpoint(_) => "Checkpoint",
489        }
490    }
491}
492
493/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
494/// [`Command`].
495pub(super) struct CommandContext {
496    subscription_info: InflightSubscriptionInfo,
497
498    pub(super) barrier_info: BarrierInfo,
499
500    pub(super) table_ids_to_commit: HashSet<TableId>,
501
502    pub(super) command: Option<Command>,
503
504    /// The tracing span of this command.
505    ///
506    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
507    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
508    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
509    _span: tracing::Span,
510}
511
512impl std::fmt::Debug for CommandContext {
513    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
514        f.debug_struct("CommandContext")
515            .field("barrier_info", &self.barrier_info)
516            .field("command", &self.command)
517            .finish()
518    }
519}
520
521impl CommandContext {
522    pub(super) fn new(
523        barrier_info: BarrierInfo,
524        subscription_info: InflightSubscriptionInfo,
525        table_ids_to_commit: HashSet<TableId>,
526        command: Option<Command>,
527        span: tracing::Span,
528    ) -> Self {
529        Self {
530            subscription_info,
531            barrier_info,
532            table_ids_to_commit,
533            command,
534            _span: span,
535        }
536    }
537
538    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
539        let Some(truncate_timestamptz) = Timestamptz::from_secs(
540            self.barrier_info
541                .prev_epoch
542                .value()
543                .as_timestamptz()
544                .timestamp()
545                - retention_second as i64,
546        ) else {
547            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
548            return self.barrier_info.prev_epoch.value();
549        };
550        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
551    }
552
553    pub(super) fn collect_commit_epoch_info(
554        &self,
555        info: &mut CommitEpochInfo,
556        resps: Vec<BarrierCompleteResponse>,
557        backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
558    ) {
559        let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) =
560            collect_resp_info(resps);
561
562        let new_table_fragment_infos =
563            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
564                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
565            {
566                let table_fragments = &info.stream_job_fragments;
567                let mut table_ids: HashSet<_> = table_fragments
568                    .internal_table_ids()
569                    .into_iter()
570                    .map(TableId::new)
571                    .collect();
572                if let Some(mv_table_id) = table_fragments.mv_table_id() {
573                    table_ids.insert(TableId::new(mv_table_id));
574                }
575
576                vec![NewTableFragmentInfo { table_ids }]
577            } else {
578                vec![]
579            };
580
581        let mut mv_log_store_truncate_epoch = HashMap::new();
582        // TODO: may collect cross db snapshot backfill
583        let mut update_truncate_epoch =
584            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
585                .entry(table_id.table_id)
586            {
587                Entry::Occupied(mut entry) => {
588                    let prev_truncate_epoch = entry.get_mut();
589                    if truncate_epoch < *prev_truncate_epoch {
590                        *prev_truncate_epoch = truncate_epoch;
591                    }
592                }
593                Entry::Vacant(entry) => {
594                    entry.insert(truncate_epoch);
595                }
596            };
597        for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
598            if let Some(truncate_epoch) = subscriptions
599                .values()
600                .max()
601                .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
602            {
603                update_truncate_epoch(*mv_table_id, truncate_epoch);
604            }
605        }
606        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
607            for mv_table_id in upstream_mv_table_ids {
608                update_truncate_epoch(mv_table_id, backfill_epoch);
609            }
610        }
611
612        let table_new_change_log = build_table_change_log_delta(
613            old_value_ssts.into_iter(),
614            synced_ssts.iter().map(|sst| &sst.sst_info),
615            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
616            mv_log_store_truncate_epoch.into_iter(),
617        );
618
619        let epoch = self.barrier_info.prev_epoch();
620        for table_id in &self.table_ids_to_commit {
621            info.tables_to_commit
622                .try_insert(*table_id, epoch)
623                .expect("non duplicate");
624        }
625
626        info.sstables.extend(synced_ssts);
627        info.new_table_watermarks.extend(new_table_watermarks);
628        info.sst_to_context.extend(sst_to_context);
629        info.new_table_fragment_infos
630            .extend(new_table_fragment_infos);
631        info.change_log_delta.extend(table_new_change_log);
632    }
633}
634
635impl Command {
636    /// Generate a mutation for the given command.
637    ///
638    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
639    pub(super) fn to_mutation(
640        &self,
641        is_currently_paused: bool,
642        edges: &mut Option<FragmentEdgeBuildResult>,
643        control_stream_manager: &ControlStreamManager,
644    ) -> Option<Mutation> {
645        match self {
646            Command::Flush => None,
647
648            Command::Pause => {
649                // Only pause when the cluster is not already paused.
650                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
651                if !is_currently_paused {
652                    Some(Mutation::Pause(PauseMutation {}))
653                } else {
654                    None
655                }
656            }
657
658            Command::Resume => {
659                // Only resume when the cluster is paused with the same reason.
660                if is_currently_paused {
661                    Some(Mutation::Resume(ResumeMutation {}))
662                } else {
663                    None
664                }
665            }
666
667            Command::SourceChangeSplit(change) => {
668                let mut diff = HashMap::new();
669
670                for actor_splits in change.values() {
671                    diff.extend(actor_splits.clone());
672                }
673
674                Some(Mutation::Splits(SourceChangeSplitMutation {
675                    actor_splits: build_actor_connector_splits(&diff),
676                }))
677            }
678
679            Command::Throttle(config) => {
680                let mut actor_to_apply = HashMap::new();
681                for per_fragment in config.values() {
682                    actor_to_apply.extend(
683                        per_fragment
684                            .iter()
685                            .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
686                    );
687                }
688
689                Some(Mutation::Throttle(ThrottleMutation {
690                    actor_throttle: actor_to_apply,
691                }))
692            }
693
694            Command::DropStreamingJobs { actors, .. } => Some(Mutation::Stop(StopMutation {
695                actors: actors.clone(),
696            })),
697
698            Command::CreateStreamingJob {
699                info:
700                    CreateStreamingJobCommandInfo {
701                        stream_job_fragments: table_fragments,
702                        init_split_assignment: split_assignment,
703                        upstream_fragment_downstreams,
704                        ..
705                    },
706                job_type,
707                ..
708            } => {
709                let edges = edges.as_mut().expect("should exist");
710                let added_actors = table_fragments.actor_ids();
711                let actor_splits = split_assignment
712                    .values()
713                    .flat_map(build_actor_connector_splits)
714                    .collect();
715                let subscriptions_to_add =
716                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
717                        job_type
718                    {
719                        snapshot_backfill_info
720                            .upstream_mv_table_id_to_backfill_epoch
721                            .keys()
722                            .map(|table_id| SubscriptionUpstreamInfo {
723                                subscriber_id: table_fragments.stream_job_id().table_id,
724                                upstream_mv_table_id: table_id.table_id,
725                            })
726                            .collect()
727                    } else {
728                        Default::default()
729                    };
730                let add = Some(Mutation::Add(AddMutation {
731                    actor_dispatchers: edges
732                        .dispatchers
733                        .extract_if(|fragment_id, _| {
734                            upstream_fragment_downstreams.contains_key(fragment_id)
735                        })
736                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
737                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
738                        .collect(),
739                    added_actors,
740                    actor_splits,
741                    // If the cluster is already paused, the new actors should be paused too.
742                    pause: is_currently_paused,
743                    subscriptions_to_add,
744                }));
745
746                if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
747                    old_fragments,
748                    init_split_assignment,
749                    replace_upstream,
750                    upstream_fragment_downstreams,
751                    ..
752                }) = job_type
753                {
754                    let merge_updates = edges
755                        .merge_updates
756                        .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
757                        .collect();
758                    let dispatchers = edges
759                        .dispatchers
760                        .extract_if(|fragment_id, _| {
761                            upstream_fragment_downstreams.contains_key(fragment_id)
762                        })
763                        .collect();
764                    let update = Self::generate_update_mutation_for_replace_table(
765                        old_fragments,
766                        merge_updates,
767                        dispatchers,
768                        init_split_assignment,
769                    );
770
771                    Some(Mutation::Combined(CombinedMutation {
772                        mutations: vec![
773                            BarrierMutation { mutation: add },
774                            BarrierMutation { mutation: update },
775                        ],
776                    }))
777                } else {
778                    add
779                }
780            }
781            Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
782                Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
783                    info: jobs_to_merge
784                        .iter()
785                        .flat_map(|(table_id, (backfill_upstream_tables, _))| {
786                            backfill_upstream_tables
787                                .iter()
788                                .map(move |upstream_table_id| SubscriptionUpstreamInfo {
789                                    subscriber_id: table_id.table_id,
790                                    upstream_mv_table_id: upstream_table_id.table_id,
791                                })
792                        })
793                        .collect(),
794                }))
795            }
796
797            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
798                old_fragments,
799                replace_upstream,
800                upstream_fragment_downstreams,
801                init_split_assignment,
802                ..
803            }) => {
804                let edges = edges.as_mut().expect("should exist");
805                let merge_updates = edges
806                    .merge_updates
807                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
808                    .collect();
809                let dispatchers = edges
810                    .dispatchers
811                    .extract_if(|fragment_id, _| {
812                        upstream_fragment_downstreams.contains_key(fragment_id)
813                    })
814                    .collect();
815                Self::generate_update_mutation_for_replace_table(
816                    old_fragments,
817                    merge_updates,
818                    dispatchers,
819                    init_split_assignment,
820                )
821            }
822
823            Command::RescheduleFragment {
824                reschedules,
825                fragment_actors,
826                ..
827            } => {
828                let mut dispatcher_update = HashMap::new();
829                for reschedule in reschedules.values() {
830                    for &(upstream_fragment_id, dispatcher_id) in
831                        &reschedule.upstream_fragment_dispatcher_ids
832                    {
833                        // Find the actors of the upstream fragment.
834                        let upstream_actor_ids = fragment_actors
835                            .get(&upstream_fragment_id)
836                            .expect("should contain");
837
838                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
839
840                        // Record updates for all actors.
841                        for &actor_id in upstream_actor_ids {
842                            let added_downstream_actor_id = if upstream_reschedule
843                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
844                                .unwrap_or(true)
845                            {
846                                reschedule
847                                    .added_actors
848                                    .values()
849                                    .flatten()
850                                    .cloned()
851                                    .collect()
852                            } else {
853                                Default::default()
854                            };
855                            // Index with the dispatcher id to check duplicates.
856                            dispatcher_update
857                                .try_insert(
858                                    (actor_id, dispatcher_id),
859                                    DispatcherUpdate {
860                                        actor_id,
861                                        dispatcher_id,
862                                        hash_mapping: reschedule
863                                            .upstream_dispatcher_mapping
864                                            .as_ref()
865                                            .map(|m| m.to_protobuf()),
866                                        added_downstream_actor_id,
867                                        removed_downstream_actor_id: reschedule
868                                            .removed_actors
869                                            .iter()
870                                            .cloned()
871                                            .collect(),
872                                    },
873                                )
874                                .unwrap();
875                        }
876                    }
877                }
878                let dispatcher_update = dispatcher_update.into_values().collect();
879
880                let mut merge_update = HashMap::new();
881                for (&fragment_id, reschedule) in reschedules {
882                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
883                        // Find the actors of the downstream fragment.
884                        let downstream_actor_ids = fragment_actors
885                            .get(&downstream_fragment_id)
886                            .expect("should contain");
887
888                        // Downstream removed actors should be skipped
889                        // Newly created actors of the current fragment will not dispatch Update
890                        // barriers to them
891                        let downstream_removed_actors: HashSet<_> = reschedules
892                            .get(&downstream_fragment_id)
893                            .map(|downstream_reschedule| {
894                                downstream_reschedule
895                                    .removed_actors
896                                    .iter()
897                                    .copied()
898                                    .collect()
899                            })
900                            .unwrap_or_default();
901
902                        // Record updates for all actors.
903                        for &actor_id in downstream_actor_ids {
904                            if downstream_removed_actors.contains(&actor_id) {
905                                continue;
906                            }
907
908                            // Index with the fragment id to check duplicates.
909                            merge_update
910                                .try_insert(
911                                    (actor_id, fragment_id),
912                                    MergeUpdate {
913                                        actor_id,
914                                        upstream_fragment_id: fragment_id,
915                                        new_upstream_fragment_id: None,
916                                        added_upstream_actors: reschedule
917                                            .added_actors
918                                            .iter()
919                                            .flat_map(|(worker_id, actors)| {
920                                                let host =
921                                                    control_stream_manager.host_addr(*worker_id);
922                                                actors.iter().map(move |actor_id| ActorInfo {
923                                                    actor_id: *actor_id,
924                                                    host: Some(host.clone()),
925                                                })
926                                            })
927                                            .collect(),
928                                        removed_upstream_actor_id: reschedule
929                                            .removed_actors
930                                            .iter()
931                                            .cloned()
932                                            .collect(),
933                                    },
934                                )
935                                .unwrap();
936                        }
937                    }
938                }
939                let merge_update = merge_update.into_values().collect();
940
941                let mut actor_vnode_bitmap_update = HashMap::new();
942                for reschedule in reschedules.values() {
943                    // Record updates for all actors in this fragment.
944                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
945                        let bitmap = bitmap.to_protobuf();
946                        actor_vnode_bitmap_update
947                            .try_insert(actor_id, bitmap)
948                            .unwrap();
949                    }
950                }
951                let dropped_actors = reschedules
952                    .values()
953                    .flat_map(|r| r.removed_actors.iter().copied())
954                    .collect();
955                let mut actor_splits = HashMap::new();
956
957                for reschedule in reschedules.values() {
958                    for (actor_id, splits) in &reschedule.actor_splits {
959                        actor_splits.insert(
960                            *actor_id as ActorId,
961                            ConnectorSplits {
962                                splits: splits.iter().map(ConnectorSplit::from).collect(),
963                            },
964                        );
965                    }
966                }
967
968                // we don't create dispatchers in reschedule scenario
969                let actor_new_dispatchers = HashMap::new();
970
971                let mutation = Mutation::Update(UpdateMutation {
972                    dispatcher_update,
973                    merge_update,
974                    actor_vnode_bitmap_update,
975                    dropped_actors,
976                    actor_splits,
977                    actor_new_dispatchers,
978                });
979                tracing::debug!("update mutation: {mutation:?}");
980                Some(mutation)
981            }
982
983            Command::CreateSubscription {
984                upstream_mv_table_id,
985                subscription_id,
986                ..
987            } => Some(Mutation::Add(AddMutation {
988                actor_dispatchers: Default::default(),
989                added_actors: vec![],
990                actor_splits: Default::default(),
991                pause: false,
992                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
993                    upstream_mv_table_id: upstream_mv_table_id.table_id,
994                    subscriber_id: *subscription_id,
995                }],
996            })),
997            Command::DropSubscription {
998                upstream_mv_table_id,
999                subscription_id,
1000            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1001                info: vec![SubscriptionUpstreamInfo {
1002                    subscriber_id: *subscription_id,
1003                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1004                }],
1005            })),
1006            Command::ConnectorPropsChange(config) => {
1007                let mut connector_props_infos = HashMap::default();
1008                for (k, v) in config {
1009                    connector_props_infos.insert(
1010                        *k,
1011                        ConnectorPropsInfo {
1012                            connector_props_info: v.clone(),
1013                        },
1014                    );
1015                }
1016                Some(Mutation::ConnectorPropsChange(
1017                    ConnectorPropsChangeMutation {
1018                        connector_props_infos,
1019                    },
1020                ))
1021            }
1022        }
1023    }
1024
1025    pub(super) fn actors_to_create(
1026        &self,
1027        graph_info: &InflightDatabaseInfo,
1028        edges: &mut Option<FragmentEdgeBuildResult>,
1029        control_stream_manager: &ControlStreamManager,
1030    ) -> Option<StreamJobActorsToCreate> {
1031        match self {
1032            Command::CreateStreamingJob { info, job_type, .. } => {
1033                let sink_into_table_replace_plan = match job_type {
1034                    CreateStreamingJobType::Normal => None,
1035                    CreateStreamingJobType::SinkIntoTable(replace_table) => Some(replace_table),
1036                    CreateStreamingJobType::SnapshotBackfill(_) => {
1037                        // for snapshot backfill, the actors to create is measured separately
1038                        return None;
1039                    }
1040                };
1041                let get_actors_to_create = || {
1042                    sink_into_table_replace_plan
1043                        .map(|plan| plan.new_fragments.actors_to_create())
1044                        .into_iter()
1045                        .flatten()
1046                        .chain(info.stream_job_fragments.actors_to_create())
1047                };
1048                let edges = edges.as_mut().expect("should exist");
1049                Some(edges.collect_actors_to_create(get_actors_to_create()))
1050            }
1051            Command::RescheduleFragment {
1052                reschedules,
1053                fragment_actors,
1054                ..
1055            } => {
1056                let mut actor_upstreams = Self::collect_actor_upstreams(
1057                    reschedules.iter().map(|(fragment_id, reschedule)| {
1058                        (
1059                            *fragment_id,
1060                            reschedule.newly_created_actors.values().map(
1061                                |((actor, dispatchers), _)| {
1062                                    (actor.actor_id, dispatchers.as_slice())
1063                                },
1064                            ),
1065                        )
1066                    }),
1067                    Some((reschedules, fragment_actors)),
1068                    graph_info,
1069                    control_stream_manager,
1070                );
1071                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1072                for (fragment_id, (actor, dispatchers), worker_id) in
1073                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1074                        reschedule
1075                            .newly_created_actors
1076                            .values()
1077                            .map(|(actors, status)| (*fragment_id, actors, status))
1078                    })
1079                {
1080                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1081                    map.entry(*worker_id)
1082                        .or_default()
1083                        .entry(fragment_id)
1084                        .or_insert_with(|| {
1085                            let node = graph_info.fragment(fragment_id).nodes.clone();
1086                            (node, vec![])
1087                        })
1088                        .1
1089                        .push((actor.clone(), upstreams, dispatchers.clone()));
1090                }
1091                Some(map)
1092            }
1093            Command::ReplaceStreamJob(replace_table) => {
1094                let edges = edges.as_mut().expect("should exist");
1095                Some(edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create()))
1096            }
1097            _ => None,
1098        }
1099    }
1100
1101    fn generate_update_mutation_for_replace_table(
1102        old_fragments: &StreamJobFragments,
1103        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1104        dispatchers: FragmentActorDispatchers,
1105        init_split_assignment: &SplitAssignment,
1106    ) -> Option<Mutation> {
1107        let dropped_actors = old_fragments.actor_ids();
1108
1109        let actor_new_dispatchers = dispatchers
1110            .into_values()
1111            .flatten()
1112            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1113            .collect();
1114
1115        let actor_splits = init_split_assignment
1116            .values()
1117            .flat_map(build_actor_connector_splits)
1118            .collect();
1119
1120        Some(Mutation::Update(UpdateMutation {
1121            actor_new_dispatchers,
1122            merge_update: merge_updates.into_values().flatten().collect(),
1123            dropped_actors,
1124            actor_splits,
1125            ..Default::default()
1126        }))
1127    }
1128
1129    /// For `CancelStreamingJob`, returns the table id of the target table.
1130    pub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1131        match self {
1132            Command::DropStreamingJobs {
1133                table_fragments_ids,
1134                ..
1135            } => Some(table_fragments_ids.iter().cloned()),
1136            _ => None,
1137        }
1138        .into_iter()
1139        .flatten()
1140    }
1141}
1142
1143impl Command {
1144    #[expect(clippy::type_complexity)]
1145    pub(super) fn collect_actor_upstreams(
1146        actor_dispatchers: impl Iterator<
1147            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1148        >,
1149        reschedule_dispatcher_update: Option<(
1150            &HashMap<FragmentId, Reschedule>,
1151            &HashMap<FragmentId, HashSet<ActorId>>,
1152        )>,
1153        graph_info: &InflightDatabaseInfo,
1154        control_stream_manager: &ControlStreamManager,
1155    ) -> HashMap<ActorId, ActorUpstreams> {
1156        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1157        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1158            let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1159            for (upstream_actor_id, dispatchers) in upstream_actors {
1160                let upstream_actor_location =
1161                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1162                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1163                for downstream_actor_id in dispatchers
1164                    .iter()
1165                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1166                {
1167                    actor_upstreams
1168                        .entry(*downstream_actor_id)
1169                        .or_default()
1170                        .entry(upstream_fragment_id)
1171                        .or_default()
1172                        .insert(
1173                            upstream_actor_id,
1174                            ActorInfo {
1175                                actor_id: upstream_actor_id,
1176                                host: Some(upstream_actor_host.clone()),
1177                            },
1178                        );
1179                }
1180            }
1181        }
1182        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1183            for reschedule in reschedules.values() {
1184                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1185                    let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1186                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1187                    for upstream_actor_id in fragment_actors
1188                        .get(upstream_fragment_id)
1189                        .expect("should exist")
1190                    {
1191                        let upstream_actor_location =
1192                            upstream_fragment.actors[upstream_actor_id].worker_id;
1193                        let upstream_actor_host =
1194                            control_stream_manager.host_addr(upstream_actor_location);
1195                        if let Some(upstream_reschedule) = upstream_reschedule
1196                            && upstream_reschedule
1197                                .removed_actors
1198                                .contains(upstream_actor_id)
1199                        {
1200                            continue;
1201                        }
1202                        for (_, downstream_actor_id) in
1203                            reschedule
1204                                .added_actors
1205                                .iter()
1206                                .flat_map(|(worker_id, actors)| {
1207                                    actors.iter().map(|actor| (*worker_id, *actor))
1208                                })
1209                        {
1210                            actor_upstreams
1211                                .entry(downstream_actor_id)
1212                                .or_default()
1213                                .entry(*upstream_fragment_id)
1214                                .or_default()
1215                                .insert(
1216                                    *upstream_actor_id,
1217                                    ActorInfo {
1218                                        actor_id: *upstream_actor_id,
1219                                        host: Some(upstream_actor_host.clone()),
1220                                    },
1221                                );
1222                        }
1223                    }
1224                }
1225            }
1226        }
1227        actor_upstreams
1228    }
1229}