risingwave_meta/barrier/
command.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
29use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_meta_model::WorkerId;
32use risingwave_pb::catalog::CreateType;
33use risingwave_pb::catalog::table::PbTableType;
34use risingwave_pb::common::PbActorInfo;
35use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
36use risingwave_pb::source::{
37    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::stream_node::NodeBody;
44use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
45use risingwave_pb::stream_plan::update_mutation::*;
46use risingwave_pb::stream_plan::{
47    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumns, PbUpstreamSinkInfo,
49    ResumeMutation, SourceChangeSplitMutation, StopMutation, SubscriptionUpstreamInfo,
50    ThrottleMutation, UpdateMutation,
51};
52use risingwave_pb::stream_service::BarrierCompleteResponse;
53use tracing::warn;
54
55use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
56use crate::MetaResult;
57use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
58use crate::barrier::cdc_progress::CdcTableBackfillTracker;
59use crate::barrier::edge_builder::FragmentEdgeBuildResult;
60use crate::barrier::info::BarrierInfo;
61use crate::barrier::rpc::ControlStreamManager;
62use crate::barrier::utils::collect_resp_info;
63use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
64use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
65use crate::manager::{StreamingJob, StreamingJobType};
66use crate::model::{
67    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
68    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
69    StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
70};
71use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
72use crate::stream::{
73    AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
74    SplitState, ThrottleConfig, UpstreamSinkInfo, build_actor_connector_splits,
75};
76
77/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
78/// in some fragment, like scaling or migrating.
79#[derive(Debug, Clone)]
80pub struct Reschedule {
81    /// Added actors in this fragment.
82    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
83
84    /// Removed actors in this fragment.
85    pub removed_actors: HashSet<ActorId>,
86
87    /// Vnode bitmap updates for some actors in this fragment.
88    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
89
90    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
91    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
92    /// New hash mapping of the upstream dispatcher to be updated.
93    ///
94    /// This field exists only when there's upstream fragment and the current fragment is
95    /// hash-sharded.
96    pub upstream_dispatcher_mapping: Option<ActorMapping>,
97
98    /// The downstream fragments of this fragment.
99    pub downstream_fragment_ids: Vec<FragmentId>,
100
101    /// Reassigned splits for source actors.
102    /// It becomes the `actor_splits` in [`UpdateMutation`].
103    /// `Source` and `SourceBackfill` are handled together here.
104    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
105
106    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
107}
108
109/// Replacing an old job with a new one. All actors in the job will be rebuilt.
110///
111/// Current use cases:
112/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
113/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
114///   will replace a table job's plan.
115#[derive(Debug, Clone)]
116pub struct ReplaceStreamJobPlan {
117    pub old_fragments: StreamJobFragments,
118    pub new_fragments: StreamJobFragmentsToCreate,
119    /// Downstream jobs of the replaced job need to update their `Merge` node to
120    /// connect to the new fragment.
121    pub replace_upstream: FragmentReplaceUpstream,
122    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
123    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
124    /// We need to reassign splits for it.
125    ///
126    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
127    /// `backfill_splits`.
128    pub init_split_assignment: SplitAssignment,
129    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
130    pub streaming_job: StreamingJob,
131    /// The temporary dummy job fragments id of new table fragment
132    pub tmp_id: JobId,
133    /// The state table ids to be dropped.
134    pub to_drop_state_table_ids: Vec<TableId>,
135    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
136}
137
138impl ReplaceStreamJobPlan {
139    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
140        let mut fragment_changes = HashMap::new();
141        for (fragment_id, new_fragment) in self
142            .new_fragments
143            .new_fragment_info(&self.init_split_assignment)
144        {
145            let fragment_change = CommandFragmentChanges::NewFragment {
146                job_id: self.streaming_job.id(),
147                info: new_fragment,
148                is_existing: false,
149            };
150            fragment_changes
151                .try_insert(fragment_id, fragment_change)
152                .expect("non-duplicate");
153        }
154        for fragment in self.old_fragments.fragments.values() {
155            fragment_changes
156                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
157                .expect("non-duplicate");
158        }
159        for (fragment_id, replace_map) in &self.replace_upstream {
160            fragment_changes
161                .try_insert(
162                    *fragment_id,
163                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
164                )
165                .expect("non-duplicate");
166        }
167        if let Some(sinks) = &self.auto_refresh_schema_sinks {
168            for sink in sinks {
169                let fragment_change = CommandFragmentChanges::NewFragment {
170                    job_id: sink.original_sink.id.as_job_id(),
171                    info: sink.new_fragment_info(),
172                    is_existing: false,
173                };
174                fragment_changes
175                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
176                    .expect("non-duplicate");
177                fragment_changes
178                    .try_insert(
179                        sink.original_fragment.fragment_id,
180                        CommandFragmentChanges::RemoveFragment,
181                    )
182                    .expect("non-duplicate");
183            }
184        }
185        fragment_changes
186    }
187
188    /// `old_fragment_id` -> `new_fragment_id`
189    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
190        let mut fragment_replacements = HashMap::new();
191        for (upstream_fragment_id, new_upstream_fragment_id) in
192            self.replace_upstream.values().flatten()
193        {
194            {
195                let r =
196                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
197                if let Some(r) = r {
198                    assert_eq!(
199                        *new_upstream_fragment_id, r,
200                        "one fragment is replaced by multiple fragments"
201                    );
202                }
203            }
204        }
205        fragment_replacements
206    }
207}
208
209#[derive(educe::Educe, Clone)]
210#[educe(Debug)]
211pub struct CreateStreamingJobCommandInfo {
212    #[educe(Debug(ignore))]
213    pub stream_job_fragments: StreamJobFragmentsToCreate,
214    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
215    pub init_split_assignment: SplitAssignment,
216    pub definition: String,
217    pub job_type: StreamingJobType,
218    pub create_type: CreateType,
219    pub streaming_job: StreamingJob,
220    pub fragment_backfill_ordering: FragmentBackfillOrder,
221    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
222    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
223}
224
225impl StreamJobFragments {
226    pub(super) fn new_fragment_info<'a>(
227        &'a self,
228        assignment: &'a SplitAssignment,
229    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
230        self.fragments.values().map(|fragment| {
231            let mut fragment_splits = assignment
232                .get(&fragment.fragment_id)
233                .cloned()
234                .unwrap_or_default();
235
236            (
237                fragment.fragment_id,
238                InflightFragmentInfo {
239                    fragment_id: fragment.fragment_id,
240                    distribution_type: fragment.distribution_type.into(),
241                    fragment_type_mask: fragment.fragment_type_mask,
242                    vnode_count: fragment.vnode_count(),
243                    nodes: fragment.nodes.clone(),
244                    actors: fragment
245                        .actors
246                        .iter()
247                        .map(|actor| {
248                            (
249                                actor.actor_id,
250                                InflightActorInfo {
251                                    worker_id: self
252                                        .actor_status
253                                        .get(&actor.actor_id)
254                                        .expect("should exist")
255                                        .worker_id(),
256                                    vnode_bitmap: actor.vnode_bitmap.clone(),
257                                    splits: fragment_splits
258                                        .remove(&actor.actor_id)
259                                        .unwrap_or_default(),
260                                },
261                            )
262                        })
263                        .collect(),
264                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
265                },
266            )
267        })
268    }
269}
270
271#[derive(Debug, Clone)]
272pub struct SnapshotBackfillInfo {
273    /// `table_id` -> `Some(snapshot_backfill_epoch)`
274    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
275    /// by global barrier worker when handling the command.
276    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
277}
278
279#[derive(Debug, Clone)]
280pub enum CreateStreamingJobType {
281    Normal,
282    SinkIntoTable(UpstreamSinkInfo),
283    SnapshotBackfill(SnapshotBackfillInfo),
284}
285
286/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
287/// it will [build different barriers to send](Self::to_mutation),
288/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
289// FIXME: this enum is significantly large on stack, box it
290#[derive(Debug)]
291pub enum Command {
292    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
293    /// all messages before the checkpoint barrier should have been committed.
294    Flush,
295
296    /// `Pause` command generates a `Pause` barrier **only if**
297    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
298    Pause,
299
300    /// `Resume` command generates a `Resume` barrier **only
301    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
302    /// will be generated.
303    Resume,
304
305    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
306    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
307    /// dropped by reference counts before.
308    ///
309    /// Barriers from the actors to be dropped will STILL be collected.
310    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
311    /// drop actors, and then delete the job fragments info from meta store.
312    DropStreamingJobs {
313        streaming_job_ids: HashSet<JobId>,
314        actors: Vec<ActorId>,
315        unregistered_state_table_ids: HashSet<TableId>,
316        unregistered_fragment_ids: HashSet<FragmentId>,
317        // target_fragment -> [sink_fragments]
318        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
319    },
320
321    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
322    ///
323    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
324    /// be collected since the barrier should be passthrough.
325    ///
326    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
327    /// it adds the job fragments info to meta store. However, the creating progress will **last
328    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
329    /// will be set to `Created`.
330    CreateStreamingJob {
331        info: CreateStreamingJobCommandInfo,
332        job_type: CreateStreamingJobType,
333        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
334    },
335
336    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
337    /// Mainly used for scaling and migration.
338    ///
339    /// Barriers from which actors should be collected, and the post behavior of this command are
340    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
341    RescheduleFragment {
342        reschedules: HashMap<FragmentId, Reschedule>,
343        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
344        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
345    },
346
347    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
348    /// essentially switching the downstream of the old job fragments to the new ones, and
349    /// dropping the old job fragments. Used for schema change.
350    ///
351    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
352    /// of the Merge executors are changed additionally.
353    ReplaceStreamJob(ReplaceStreamJobPlan),
354
355    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
356    /// changed splits.
357    SourceChangeSplit(SplitState),
358
359    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
360    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
361    Throttle(ThrottleConfig),
362
363    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
364    /// materialize executor to start storing old value for subscription.
365    CreateSubscription {
366        subscription_id: SubscriptionId,
367        upstream_mv_table_id: TableId,
368        retention_second: u64,
369    },
370
371    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
372    /// materialize executor to stop storing old value when there is no
373    /// subscription depending on it.
374    DropSubscription {
375        subscription_id: SubscriptionId,
376        upstream_mv_table_id: TableId,
377    },
378
379    ConnectorPropsChange(ConnectorPropsChange),
380
381    /// `Refresh` command generates a barrier to refresh a table by truncating state
382    /// and reloading data from source.
383    Refresh {
384        table_id: TableId,
385        associated_source_id: SourceId,
386    },
387    ListFinish {
388        table_id: TableId,
389        associated_source_id: SourceId,
390    },
391    LoadFinish {
392        table_id: TableId,
393        associated_source_id: SourceId,
394    },
395}
396
397// For debugging and observability purposes. Can add more details later if needed.
398impl std::fmt::Display for Command {
399    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
400        match self {
401            Command::Flush => write!(f, "Flush"),
402            Command::Pause => write!(f, "Pause"),
403            Command::Resume => write!(f, "Resume"),
404            Command::DropStreamingJobs {
405                streaming_job_ids, ..
406            } => {
407                write!(
408                    f,
409                    "DropStreamingJobs: {}",
410                    streaming_job_ids.iter().sorted().join(", ")
411                )
412            }
413            Command::CreateStreamingJob { info, .. } => {
414                write!(f, "CreateStreamingJob: {}", info.streaming_job)
415            }
416            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
417            Command::ReplaceStreamJob(plan) => {
418                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
419            }
420            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
421            Command::Throttle(_) => write!(f, "Throttle"),
422            Command::CreateSubscription {
423                subscription_id, ..
424            } => write!(f, "CreateSubscription: {subscription_id}"),
425            Command::DropSubscription {
426                subscription_id, ..
427            } => write!(f, "DropSubscription: {subscription_id}"),
428            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
429            Command::Refresh {
430                table_id,
431                associated_source_id,
432            } => write!(
433                f,
434                "Refresh: {} (source: {})",
435                table_id, associated_source_id
436            ),
437            Command::ListFinish {
438                table_id,
439                associated_source_id,
440            } => write!(
441                f,
442                "ListFinish: {} (source: {})",
443                table_id, associated_source_id
444            ),
445            Command::LoadFinish {
446                table_id,
447                associated_source_id,
448            } => write!(
449                f,
450                "LoadFinish: {} (source: {})",
451                table_id, associated_source_id
452            ),
453        }
454    }
455}
456
457impl Command {
458    pub fn pause() -> Self {
459        Self::Pause
460    }
461
462    pub fn resume() -> Self {
463        Self::Resume
464    }
465
466    #[expect(clippy::type_complexity)]
467    pub(super) fn fragment_changes(
468        &self,
469    ) -> Option<(
470        Option<(JobId, Option<CdcTableBackfillTracker>)>,
471        HashMap<FragmentId, CommandFragmentChanges>,
472    )> {
473        match self {
474            Command::Flush => None,
475            Command::Pause => None,
476            Command::Resume => None,
477            Command::DropStreamingJobs {
478                unregistered_fragment_ids,
479                dropped_sink_fragment_by_targets,
480                ..
481            } => {
482                let changes = unregistered_fragment_ids
483                    .iter()
484                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
485                    .chain(dropped_sink_fragment_by_targets.iter().map(
486                        |(target_fragment, sink_fragments)| {
487                            (
488                                *target_fragment,
489                                CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
490                            )
491                        },
492                    ))
493                    .collect();
494
495                Some((None, changes))
496            }
497            Command::CreateStreamingJob { info, job_type, .. } => {
498                assert!(
499                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
500                    "should handle fragment changes separately for snapshot backfill"
501                );
502                let mut changes: HashMap<_, _> = info
503                    .stream_job_fragments
504                    .new_fragment_info(&info.init_split_assignment)
505                    .map(|(fragment_id, fragment_infos)| {
506                        (
507                            fragment_id,
508                            CommandFragmentChanges::NewFragment {
509                                job_id: info.streaming_job.id(),
510                                info: fragment_infos,
511                                is_existing: false,
512                            },
513                        )
514                    })
515                    .collect();
516
517                if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
518                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
519                    changes.insert(
520                        downstream_fragment_id,
521                        CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
522                            upstream_fragment_id: ctx.sink_fragment_id,
523                            sink_output_schema: ctx.sink_output_fields.clone(),
524                            project_exprs: ctx.project_exprs.clone(),
525                        }),
526                    );
527                }
528
529                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
530                    let (fragment, _) =
531                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
532                            .expect("should have parallel cdc fragment");
533                    Some(CdcTableBackfillTracker::new(
534                        fragment.fragment_id,
535                        splits.clone(),
536                    ))
537                } else {
538                    None
539                };
540
541                Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
542            }
543            Command::RescheduleFragment { reschedules, .. } => Some((
544                None,
545                reschedules
546                    .iter()
547                    .map(|(fragment_id, reschedule)| {
548                        (
549                            *fragment_id,
550                            CommandFragmentChanges::Reschedule {
551                                new_actors: reschedule
552                                    .added_actors
553                                    .iter()
554                                    .flat_map(|(node_id, actors)| {
555                                        actors.iter().map(|actor_id| {
556                                            (
557                                                *actor_id,
558                                                InflightActorInfo {
559                                                    worker_id: *node_id,
560                                                    vnode_bitmap: reschedule
561                                                        .newly_created_actors
562                                                        .get(actor_id)
563                                                        .expect("should exist")
564                                                        .0
565                                                        .0
566                                                        .vnode_bitmap
567                                                        .clone(),
568                                                    splits: reschedule
569                                                        .actor_splits
570                                                        .get(actor_id)
571                                                        .cloned()
572                                                        .unwrap_or_default(),
573                                                },
574                                            )
575                                        })
576                                    })
577                                    .collect(),
578                                actor_update_vnode_bitmap: reschedule
579                                    .vnode_bitmap_updates
580                                    .iter()
581                                    .filter(|(actor_id, _)| {
582                                        // only keep the existing actors
583                                        !reschedule.newly_created_actors.contains_key(actor_id)
584                                    })
585                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
586                                    .collect(),
587                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
588                                actor_splits: reschedule.actor_splits.clone(),
589                            },
590                        )
591                    })
592                    .collect(),
593            )),
594            Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
595            Command::SourceChangeSplit(SplitState {
596                split_assignment, ..
597            }) => Some((
598                None,
599                split_assignment
600                    .iter()
601                    .map(|(&fragment_id, splits)| {
602                        (
603                            fragment_id,
604                            CommandFragmentChanges::SplitAssignment {
605                                actor_splits: splits.clone(),
606                            },
607                        )
608                    })
609                    .collect(),
610            )),
611            Command::Throttle(_) => None,
612            Command::CreateSubscription { .. } => None,
613            Command::DropSubscription { .. } => None,
614            Command::ConnectorPropsChange(_) => None,
615            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
616            Command::ListFinish { .. } => None, // ListFinish doesn't change fragment structure
617            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
618        }
619    }
620
621    pub fn need_checkpoint(&self) -> bool {
622        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
623        !matches!(self, Command::Resume)
624    }
625}
626
627#[derive(Debug, Clone)]
628pub enum BarrierKind {
629    Initial,
630    Barrier,
631    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
632    Checkpoint(Vec<u64>),
633}
634
635impl BarrierKind {
636    pub fn to_protobuf(&self) -> PbBarrierKind {
637        match self {
638            BarrierKind::Initial => PbBarrierKind::Initial,
639            BarrierKind::Barrier => PbBarrierKind::Barrier,
640            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
641        }
642    }
643
644    pub fn is_checkpoint(&self) -> bool {
645        matches!(self, BarrierKind::Checkpoint(_))
646    }
647
648    pub fn is_initial(&self) -> bool {
649        matches!(self, BarrierKind::Initial)
650    }
651
652    pub fn as_str_name(&self) -> &'static str {
653        match self {
654            BarrierKind::Initial => "Initial",
655            BarrierKind::Barrier => "Barrier",
656            BarrierKind::Checkpoint(_) => "Checkpoint",
657        }
658    }
659}
660
661/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
662/// [`Command`].
663pub(super) struct CommandContext {
664    mv_subscription_max_retention: HashMap<TableId, u64>,
665
666    pub(super) barrier_info: BarrierInfo,
667
668    pub(super) table_ids_to_commit: HashSet<TableId>,
669
670    pub(super) command: Option<Command>,
671
672    /// The tracing span of this command.
673    ///
674    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
675    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
676    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
677    _span: tracing::Span,
678}
679
680impl std::fmt::Debug for CommandContext {
681    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
682        f.debug_struct("CommandContext")
683            .field("barrier_info", &self.barrier_info)
684            .field("command", &self.command)
685            .finish()
686    }
687}
688
689impl std::fmt::Display for CommandContext {
690    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
691        write!(
692            f,
693            "prev_epoch={}, curr_epoch={}, kind={}",
694            self.barrier_info.prev_epoch.value().0,
695            self.barrier_info.curr_epoch.value().0,
696            self.barrier_info.kind.as_str_name()
697        )?;
698        if let Some(command) = &self.command {
699            write!(f, ", command={}", command)?;
700        }
701        Ok(())
702    }
703}
704
705impl CommandContext {
706    pub(super) fn new(
707        barrier_info: BarrierInfo,
708        mv_subscription_max_retention: HashMap<TableId, u64>,
709        table_ids_to_commit: HashSet<TableId>,
710        command: Option<Command>,
711        span: tracing::Span,
712    ) -> Self {
713        Self {
714            mv_subscription_max_retention,
715            barrier_info,
716            table_ids_to_commit,
717            command,
718            _span: span,
719        }
720    }
721
722    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
723        let Some(truncate_timestamptz) = Timestamptz::from_secs(
724            self.barrier_info
725                .prev_epoch
726                .value()
727                .as_timestamptz()
728                .timestamp()
729                - retention_second as i64,
730        ) else {
731            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
732            return self.barrier_info.prev_epoch.value();
733        };
734        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
735    }
736
737    pub(super) fn collect_commit_epoch_info(
738        &self,
739        info: &mut CommitEpochInfo,
740        resps: Vec<BarrierCompleteResponse>,
741        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
742    ) {
743        let (
744            sst_to_context,
745            synced_ssts,
746            new_table_watermarks,
747            old_value_ssts,
748            vector_index_adds,
749            truncate_tables,
750        ) = collect_resp_info(resps);
751
752        let new_table_fragment_infos =
753            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
754                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
755            {
756                let table_fragments = &info.stream_job_fragments;
757                let mut table_ids: HashSet<_> =
758                    table_fragments.internal_table_ids().into_iter().collect();
759                if let Some(mv_table_id) = table_fragments.mv_table_id() {
760                    table_ids.insert(mv_table_id);
761                }
762
763                vec![NewTableFragmentInfo { table_ids }]
764            } else {
765                vec![]
766            };
767
768        let mut mv_log_store_truncate_epoch = HashMap::new();
769        // TODO: may collect cross db snapshot backfill
770        let mut update_truncate_epoch =
771            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
772                Entry::Occupied(mut entry) => {
773                    let prev_truncate_epoch = entry.get_mut();
774                    if truncate_epoch < *prev_truncate_epoch {
775                        *prev_truncate_epoch = truncate_epoch;
776                    }
777                }
778                Entry::Vacant(entry) => {
779                    entry.insert(truncate_epoch);
780                }
781            };
782        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
783            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
784            update_truncate_epoch(*mv_table_id, truncate_epoch);
785        }
786        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
787            for mv_table_id in upstream_mv_table_ids {
788                update_truncate_epoch(mv_table_id, backfill_epoch);
789            }
790        }
791
792        let table_new_change_log = build_table_change_log_delta(
793            old_value_ssts.into_iter(),
794            synced_ssts.iter().map(|sst| &sst.sst_info),
795            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
796            mv_log_store_truncate_epoch.into_iter(),
797        );
798
799        let epoch = self.barrier_info.prev_epoch();
800        for table_id in &self.table_ids_to_commit {
801            info.tables_to_commit
802                .try_insert(*table_id, epoch)
803                .expect("non duplicate");
804        }
805
806        info.sstables.extend(synced_ssts);
807        info.new_table_watermarks.extend(new_table_watermarks);
808        info.sst_to_context.extend(sst_to_context);
809        info.new_table_fragment_infos
810            .extend(new_table_fragment_infos);
811        info.change_log_delta.extend(table_new_change_log);
812        for (table_id, vector_index_adds) in vector_index_adds {
813            info.vector_index_delta
814                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
815                .expect("non-duplicate");
816        }
817        if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
818            for fragment in job_info.stream_job_fragments.fragments.values() {
819                visit_stream_node_cont(&fragment.nodes, |node| {
820                    match node.node_body.as_ref().unwrap() {
821                        NodeBody::VectorIndexWrite(vector_index_write) => {
822                            let index_table = vector_index_write.table.as_ref().unwrap();
823                            assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
824                            info.vector_index_delta
825                                .try_insert(
826                                    index_table.id,
827                                    VectorIndexDelta::Init(PbVectorIndexInit {
828                                        info: Some(index_table.vector_index_info.unwrap()),
829                                    }),
830                                )
831                                .expect("non-duplicate");
832                            false
833                        }
834                        _ => true,
835                    }
836                })
837            }
838        }
839        info.truncate_tables.extend(truncate_tables);
840    }
841}
842
843impl Command {
844    /// Generate a mutation for the given command.
845    ///
846    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
847    pub(super) fn to_mutation(
848        &self,
849        is_currently_paused: bool,
850        edges: &mut Option<FragmentEdgeBuildResult>,
851        control_stream_manager: &ControlStreamManager,
852        database_info: &mut InflightDatabaseInfo,
853    ) -> MetaResult<Option<Mutation>> {
854        let mutation = match self {
855            Command::Flush => None,
856
857            Command::Pause => {
858                // Only pause when the cluster is not already paused.
859                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
860                if !is_currently_paused {
861                    Some(Mutation::Pause(PauseMutation {}))
862                } else {
863                    None
864                }
865            }
866
867            Command::Resume => {
868                // Only resume when the cluster is paused with the same reason.
869                if is_currently_paused {
870                    Some(Mutation::Resume(ResumeMutation {}))
871                } else {
872                    None
873                }
874            }
875
876            Command::SourceChangeSplit(SplitState {
877                split_assignment, ..
878            }) => {
879                let mut diff = HashMap::new();
880
881                for actor_splits in split_assignment.values() {
882                    diff.extend(actor_splits.clone());
883                }
884
885                Some(Mutation::Splits(SourceChangeSplitMutation {
886                    actor_splits: build_actor_connector_splits(&diff),
887                }))
888            }
889
890            Command::Throttle(config) => {
891                let mut actor_to_apply = HashMap::new();
892                for per_fragment in config.values() {
893                    actor_to_apply.extend(
894                        per_fragment
895                            .iter()
896                            .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
897                    );
898                }
899
900                Some(Mutation::Throttle(ThrottleMutation {
901                    actor_throttle: actor_to_apply,
902                }))
903            }
904
905            Command::DropStreamingJobs {
906                actors,
907                dropped_sink_fragment_by_targets,
908                ..
909            } => Some(Mutation::Stop(StopMutation {
910                actors: actors.clone(),
911                dropped_sink_fragments: dropped_sink_fragment_by_targets
912                    .values()
913                    .flatten()
914                    .cloned()
915                    .collect(),
916            })),
917
918            Command::CreateStreamingJob {
919                info:
920                    CreateStreamingJobCommandInfo {
921                        stream_job_fragments,
922                        init_split_assignment: split_assignment,
923                        upstream_fragment_downstreams,
924                        fragment_backfill_ordering,
925                        ..
926                    },
927                job_type,
928                ..
929            } => {
930                let edges = edges.as_mut().expect("should exist");
931                let added_actors = stream_job_fragments.actor_ids().collect();
932                let actor_splits = split_assignment
933                    .values()
934                    .flat_map(build_actor_connector_splits)
935                    .collect();
936                let subscriptions_to_add =
937                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
938                        job_type
939                    {
940                        snapshot_backfill_info
941                            .upstream_mv_table_id_to_backfill_epoch
942                            .keys()
943                            .map(|table_id| SubscriptionUpstreamInfo {
944                                subscriber_id: stream_job_fragments
945                                    .stream_job_id()
946                                    .as_subscriber_id(),
947                                upstream_mv_table_id: *table_id,
948                            })
949                            .collect()
950                    } else {
951                        Default::default()
952                    };
953                let backfill_nodes_to_pause: Vec<_> =
954                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
955                        .into_iter()
956                        .collect();
957
958                let new_upstream_sinks =
959                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
960                        sink_fragment_id,
961                        sink_output_fields,
962                        project_exprs,
963                        new_sink_downstream,
964                        ..
965                    }) = job_type
966                    {
967                        let new_sink_actors = stream_job_fragments
968                            .actors_to_create()
969                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
970                            .exactly_one()
971                            .map(|(_, _, actors)| {
972                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
973                                    actor_id: actor.actor_id,
974                                    host: Some(control_stream_manager.host_addr(worker_id)),
975                                })
976                            })
977                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
978                        let new_upstream_sink = PbNewUpstreamSink {
979                            info: Some(PbUpstreamSinkInfo {
980                                upstream_fragment_id: *sink_fragment_id,
981                                sink_output_schema: sink_output_fields.clone(),
982                                project_exprs: project_exprs.clone(),
983                            }),
984                            upstream_actors: new_sink_actors.collect(),
985                        };
986                        HashMap::from([(
987                            new_sink_downstream.downstream_fragment_id,
988                            new_upstream_sink,
989                        )])
990                    } else {
991                        HashMap::new()
992                    };
993
994                let actor_cdc_table_snapshot_splits =
995                    if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
996                        database_info
997                            .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
998                            .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
999                    } else {
1000                        None
1001                    };
1002
1003                let add_mutation = AddMutation {
1004                    actor_dispatchers: edges
1005                        .dispatchers
1006                        .extract_if(|fragment_id, _| {
1007                            upstream_fragment_downstreams.contains_key(fragment_id)
1008                        })
1009                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1010                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1011                        .collect(),
1012                    added_actors,
1013                    actor_splits,
1014                    // If the cluster is already paused, the new actors should be paused too.
1015                    pause: is_currently_paused,
1016                    subscriptions_to_add,
1017                    backfill_nodes_to_pause,
1018                    actor_cdc_table_snapshot_splits,
1019                    new_upstream_sinks,
1020                };
1021
1022                Some(Mutation::Add(add_mutation))
1023            }
1024
1025            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1026                old_fragments,
1027                replace_upstream,
1028                upstream_fragment_downstreams,
1029                init_split_assignment,
1030                auto_refresh_schema_sinks,
1031                ..
1032            }) => {
1033                let edges = edges.as_mut().expect("should exist");
1034                let merge_updates = edges
1035                    .merge_updates
1036                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1037                    .collect();
1038                let dispatchers = edges
1039                    .dispatchers
1040                    .extract_if(|fragment_id, _| {
1041                        upstream_fragment_downstreams.contains_key(fragment_id)
1042                    })
1043                    .collect();
1044                let actor_cdc_table_snapshot_splits = database_info
1045                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1046                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1047                Self::generate_update_mutation_for_replace_table(
1048                    old_fragments.actor_ids().chain(
1049                        auto_refresh_schema_sinks
1050                            .as_ref()
1051                            .into_iter()
1052                            .flat_map(|sinks| {
1053                                sinks.iter().flat_map(|sink| {
1054                                    sink.original_fragment
1055                                        .actors
1056                                        .iter()
1057                                        .map(|actor| actor.actor_id)
1058                                })
1059                            }),
1060                    ),
1061                    merge_updates,
1062                    dispatchers,
1063                    init_split_assignment,
1064                    actor_cdc_table_snapshot_splits,
1065                    auto_refresh_schema_sinks.as_ref(),
1066                )
1067            }
1068
1069            Command::RescheduleFragment {
1070                reschedules,
1071                fragment_actors,
1072                ..
1073            } => {
1074                let mut dispatcher_update = HashMap::new();
1075                for reschedule in reschedules.values() {
1076                    for &(upstream_fragment_id, dispatcher_id) in
1077                        &reschedule.upstream_fragment_dispatcher_ids
1078                    {
1079                        // Find the actors of the upstream fragment.
1080                        let upstream_actor_ids = fragment_actors
1081                            .get(&upstream_fragment_id)
1082                            .expect("should contain");
1083
1084                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1085
1086                        // Record updates for all actors.
1087                        for &actor_id in upstream_actor_ids {
1088                            let added_downstream_actor_id = if upstream_reschedule
1089                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1090                                .unwrap_or(true)
1091                            {
1092                                reschedule
1093                                    .added_actors
1094                                    .values()
1095                                    .flatten()
1096                                    .cloned()
1097                                    .collect()
1098                            } else {
1099                                Default::default()
1100                            };
1101                            // Index with the dispatcher id to check duplicates.
1102                            dispatcher_update
1103                                .try_insert(
1104                                    (actor_id, dispatcher_id),
1105                                    DispatcherUpdate {
1106                                        actor_id,
1107                                        dispatcher_id,
1108                                        hash_mapping: reschedule
1109                                            .upstream_dispatcher_mapping
1110                                            .as_ref()
1111                                            .map(|m| m.to_protobuf()),
1112                                        added_downstream_actor_id,
1113                                        removed_downstream_actor_id: reschedule
1114                                            .removed_actors
1115                                            .iter()
1116                                            .cloned()
1117                                            .collect(),
1118                                    },
1119                                )
1120                                .unwrap();
1121                        }
1122                    }
1123                }
1124                let dispatcher_update = dispatcher_update.into_values().collect();
1125
1126                let mut merge_update = HashMap::new();
1127                for (&fragment_id, reschedule) in reschedules {
1128                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1129                        // Find the actors of the downstream fragment.
1130                        let downstream_actor_ids = fragment_actors
1131                            .get(&downstream_fragment_id)
1132                            .expect("should contain");
1133
1134                        // Downstream removed actors should be skipped
1135                        // Newly created actors of the current fragment will not dispatch Update
1136                        // barriers to them
1137                        let downstream_removed_actors: HashSet<_> = reschedules
1138                            .get(&downstream_fragment_id)
1139                            .map(|downstream_reschedule| {
1140                                downstream_reschedule
1141                                    .removed_actors
1142                                    .iter()
1143                                    .copied()
1144                                    .collect()
1145                            })
1146                            .unwrap_or_default();
1147
1148                        // Record updates for all actors.
1149                        for &actor_id in downstream_actor_ids {
1150                            if downstream_removed_actors.contains(&actor_id) {
1151                                continue;
1152                            }
1153
1154                            // Index with the fragment id to check duplicates.
1155                            merge_update
1156                                .try_insert(
1157                                    (actor_id, fragment_id),
1158                                    MergeUpdate {
1159                                        actor_id,
1160                                        upstream_fragment_id: fragment_id,
1161                                        new_upstream_fragment_id: None,
1162                                        added_upstream_actors: reschedule
1163                                            .added_actors
1164                                            .iter()
1165                                            .flat_map(|(worker_id, actors)| {
1166                                                let host =
1167                                                    control_stream_manager.host_addr(*worker_id);
1168                                                actors.iter().map(move |&actor_id| PbActorInfo {
1169                                                    actor_id,
1170                                                    host: Some(host.clone()),
1171                                                })
1172                                            })
1173                                            .collect(),
1174                                        removed_upstream_actor_id: reschedule
1175                                            .removed_actors
1176                                            .iter()
1177                                            .cloned()
1178                                            .collect(),
1179                                    },
1180                                )
1181                                .unwrap();
1182                        }
1183                    }
1184                }
1185                let merge_update = merge_update.into_values().collect();
1186
1187                let mut actor_vnode_bitmap_update = HashMap::new();
1188                for reschedule in reschedules.values() {
1189                    // Record updates for all actors in this fragment.
1190                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1191                        let bitmap = bitmap.to_protobuf();
1192                        actor_vnode_bitmap_update
1193                            .try_insert(actor_id, bitmap)
1194                            .unwrap();
1195                    }
1196                }
1197                let dropped_actors = reschedules
1198                    .values()
1199                    .flat_map(|r| r.removed_actors.iter().copied())
1200                    .collect();
1201                let mut actor_splits = HashMap::new();
1202                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1203                for (fragment_id, reschedule) in reschedules {
1204                    for (actor_id, splits) in &reschedule.actor_splits {
1205                        actor_splits.insert(
1206                            *actor_id,
1207                            ConnectorSplits {
1208                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1209                            },
1210                        );
1211                    }
1212
1213                    if let Some(assignment) =
1214                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1215                    {
1216                        actor_cdc_table_snapshot_splits.extend(assignment)
1217                    }
1218                }
1219
1220                // we don't create dispatchers in reschedule scenario
1221                let actor_new_dispatchers = HashMap::new();
1222                let mutation = Mutation::Update(UpdateMutation {
1223                    dispatcher_update,
1224                    merge_update,
1225                    actor_vnode_bitmap_update,
1226                    dropped_actors,
1227                    actor_splits,
1228                    actor_new_dispatchers,
1229                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1230                        splits: actor_cdc_table_snapshot_splits,
1231                    }),
1232                    sink_add_columns: Default::default(),
1233                });
1234                tracing::debug!("update mutation: {mutation:?}");
1235                Some(mutation)
1236            }
1237
1238            Command::CreateSubscription {
1239                upstream_mv_table_id,
1240                subscription_id,
1241                ..
1242            } => Some(Mutation::Add(AddMutation {
1243                actor_dispatchers: Default::default(),
1244                added_actors: vec![],
1245                actor_splits: Default::default(),
1246                pause: false,
1247                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1248                    upstream_mv_table_id: *upstream_mv_table_id,
1249                    subscriber_id: subscription_id.as_subscriber_id(),
1250                }],
1251                backfill_nodes_to_pause: vec![],
1252                actor_cdc_table_snapshot_splits: None,
1253                new_upstream_sinks: Default::default(),
1254            })),
1255            Command::DropSubscription {
1256                upstream_mv_table_id,
1257                subscription_id,
1258            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1259                info: vec![SubscriptionUpstreamInfo {
1260                    subscriber_id: subscription_id.as_subscriber_id(),
1261                    upstream_mv_table_id: *upstream_mv_table_id,
1262                }],
1263            })),
1264            Command::ConnectorPropsChange(config) => {
1265                let mut connector_props_infos = HashMap::default();
1266                for (k, v) in config {
1267                    connector_props_infos.insert(
1268                        k.as_raw_id(),
1269                        ConnectorPropsInfo {
1270                            connector_props_info: v.clone(),
1271                        },
1272                    );
1273                }
1274                Some(Mutation::ConnectorPropsChange(
1275                    ConnectorPropsChangeMutation {
1276                        connector_props_infos,
1277                    },
1278                ))
1279            }
1280            Command::Refresh {
1281                table_id,
1282                associated_source_id,
1283            } => Some(Mutation::RefreshStart(
1284                risingwave_pb::stream_plan::RefreshStartMutation {
1285                    table_id: *table_id,
1286                    associated_source_id: *associated_source_id,
1287                },
1288            )),
1289            Command::ListFinish {
1290                table_id: _,
1291                associated_source_id,
1292            } => Some(Mutation::ListFinish(ListFinishMutation {
1293                associated_source_id: *associated_source_id,
1294            })),
1295            Command::LoadFinish {
1296                table_id: _,
1297                associated_source_id,
1298            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1299                associated_source_id: *associated_source_id,
1300            })),
1301        };
1302        Ok(mutation)
1303    }
1304
1305    pub(super) fn actors_to_create(
1306        &self,
1307        database_info: &InflightDatabaseInfo,
1308        edges: &mut Option<FragmentEdgeBuildResult>,
1309        control_stream_manager: &ControlStreamManager,
1310    ) -> Option<StreamJobActorsToCreate> {
1311        match self {
1312            Command::CreateStreamingJob { info, job_type, .. } => {
1313                if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1314                    // for snapshot backfill, the actors to create is measured separately
1315                    return None;
1316                }
1317                let actors_to_create = info.stream_job_fragments.actors_to_create();
1318                let edges = edges.as_mut().expect("should exist");
1319                Some(edges.collect_actors_to_create(actors_to_create.map(
1320                    |(fragment_id, node, actors)| {
1321                        (
1322                            fragment_id,
1323                            node,
1324                            actors,
1325                            [], // no subscriber for new job to create
1326                        )
1327                    },
1328                )))
1329            }
1330            Command::RescheduleFragment {
1331                reschedules,
1332                fragment_actors,
1333                ..
1334            } => {
1335                let mut actor_upstreams = Self::collect_actor_upstreams(
1336                    reschedules.iter().map(|(fragment_id, reschedule)| {
1337                        (
1338                            *fragment_id,
1339                            reschedule.newly_created_actors.values().map(
1340                                |((actor, dispatchers), _)| {
1341                                    (actor.actor_id, dispatchers.as_slice())
1342                                },
1343                            ),
1344                        )
1345                    }),
1346                    Some((reschedules, fragment_actors)),
1347                    database_info,
1348                    control_stream_manager,
1349                );
1350                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1351                for (fragment_id, (actor, dispatchers), worker_id) in
1352                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1353                        reschedule
1354                            .newly_created_actors
1355                            .values()
1356                            .map(|(actors, status)| (*fragment_id, actors, status))
1357                    })
1358                {
1359                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1360                    map.entry(*worker_id)
1361                        .or_default()
1362                        .entry(fragment_id)
1363                        .or_insert_with(|| {
1364                            let node = database_info.fragment(fragment_id).nodes.clone();
1365                            let subscribers =
1366                                database_info.fragment_subscribers(fragment_id).collect();
1367                            (node, vec![], subscribers)
1368                        })
1369                        .1
1370                        .push((actor.clone(), upstreams, dispatchers.clone()));
1371                }
1372                Some(map)
1373            }
1374            Command::ReplaceStreamJob(replace_table) => {
1375                let edges = edges.as_mut().expect("should exist");
1376                let mut actors = edges.collect_actors_to_create(
1377                    replace_table.new_fragments.actors_to_create().map(
1378                        |(fragment_id, node, actors)| {
1379                            (
1380                                fragment_id,
1381                                node,
1382                                actors,
1383                                database_info
1384                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1385                            )
1386                        },
1387                    ),
1388                );
1389                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1390                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1391                        (
1392                            sink.new_fragment.fragment_id,
1393                            &sink.new_fragment.nodes,
1394                            sink.new_fragment.actors.iter().map(|actor| {
1395                                (
1396                                    actor,
1397                                    sink.actor_status[&actor.actor_id]
1398                                        .location
1399                                        .as_ref()
1400                                        .unwrap()
1401                                        .worker_node_id,
1402                                )
1403                            }),
1404                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1405                        )
1406                    }));
1407                    for (worker_id, fragment_actors) in sink_actors {
1408                        actors.entry(worker_id).or_default().extend(fragment_actors);
1409                    }
1410                }
1411                Some(actors)
1412            }
1413            _ => None,
1414        }
1415    }
1416
1417    fn generate_update_mutation_for_replace_table(
1418        dropped_actors: impl IntoIterator<Item = ActorId>,
1419        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1420        dispatchers: FragmentActorDispatchers,
1421        init_split_assignment: &SplitAssignment,
1422        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1423        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1424    ) -> Option<Mutation> {
1425        let dropped_actors = dropped_actors.into_iter().collect();
1426
1427        let actor_new_dispatchers = dispatchers
1428            .into_values()
1429            .flatten()
1430            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1431            .collect();
1432
1433        let actor_splits = init_split_assignment
1434            .values()
1435            .flat_map(build_actor_connector_splits)
1436            .collect();
1437        Some(Mutation::Update(UpdateMutation {
1438            actor_new_dispatchers,
1439            merge_update: merge_updates.into_values().flatten().collect(),
1440            dropped_actors,
1441            actor_splits,
1442            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1443            sink_add_columns: auto_refresh_schema_sinks
1444                .as_ref()
1445                .into_iter()
1446                .flat_map(|sinks| {
1447                    sinks.iter().map(|sink| {
1448                        (
1449                            sink.original_sink.id,
1450                            PbSinkAddColumns {
1451                                fields: sink
1452                                    .newly_add_fields
1453                                    .iter()
1454                                    .map(|field| field.to_prost())
1455                                    .collect(),
1456                            },
1457                        )
1458                    })
1459                })
1460                .collect(),
1461            ..Default::default()
1462        }))
1463    }
1464
1465    /// For `CancelStreamingJob`, returns the table id of the target table.
1466    pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_ {
1467        match self {
1468            Command::DropStreamingJobs {
1469                streaming_job_ids, ..
1470            } => Some(streaming_job_ids.iter().cloned()),
1471            _ => None,
1472        }
1473        .into_iter()
1474        .flatten()
1475    }
1476}
1477
1478impl Command {
1479    #[expect(clippy::type_complexity)]
1480    pub(super) fn collect_actor_upstreams(
1481        actor_dispatchers: impl Iterator<
1482            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1483        >,
1484        reschedule_dispatcher_update: Option<(
1485            &HashMap<FragmentId, Reschedule>,
1486            &HashMap<FragmentId, HashSet<ActorId>>,
1487        )>,
1488        database_info: &InflightDatabaseInfo,
1489        control_stream_manager: &ControlStreamManager,
1490    ) -> HashMap<ActorId, ActorUpstreams> {
1491        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1492        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1493            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1494            for (upstream_actor_id, dispatchers) in upstream_actors {
1495                let upstream_actor_location =
1496                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1497                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1498                for downstream_actor_id in dispatchers
1499                    .iter()
1500                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1501                {
1502                    actor_upstreams
1503                        .entry(*downstream_actor_id)
1504                        .or_default()
1505                        .entry(upstream_fragment_id)
1506                        .or_default()
1507                        .insert(
1508                            upstream_actor_id,
1509                            PbActorInfo {
1510                                actor_id: upstream_actor_id,
1511                                host: Some(upstream_actor_host.clone()),
1512                            },
1513                        );
1514                }
1515            }
1516        }
1517        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1518            for reschedule in reschedules.values() {
1519                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1520                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1521                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1522                    for upstream_actor_id in fragment_actors
1523                        .get(upstream_fragment_id)
1524                        .expect("should exist")
1525                    {
1526                        let upstream_actor_location =
1527                            upstream_fragment.actors[upstream_actor_id].worker_id;
1528                        let upstream_actor_host =
1529                            control_stream_manager.host_addr(upstream_actor_location);
1530                        if let Some(upstream_reschedule) = upstream_reschedule
1531                            && upstream_reschedule
1532                                .removed_actors
1533                                .contains(upstream_actor_id)
1534                        {
1535                            continue;
1536                        }
1537                        for (_, downstream_actor_id) in
1538                            reschedule
1539                                .added_actors
1540                                .iter()
1541                                .flat_map(|(worker_id, actors)| {
1542                                    actors.iter().map(|actor| (*worker_id, *actor))
1543                                })
1544                        {
1545                            actor_upstreams
1546                                .entry(downstream_actor_id)
1547                                .or_default()
1548                                .entry(*upstream_fragment_id)
1549                                .or_default()
1550                                .insert(
1551                                    *upstream_actor_id,
1552                                    PbActorInfo {
1553                                        actor_id: *upstream_actor_id,
1554                                        host: Some(upstream_actor_host.clone()),
1555                                    },
1556                                );
1557                        }
1558                    }
1559                }
1560            }
1561        }
1562        actor_upstreams
1563    }
1564}