risingwave_meta/barrier/
command.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
29use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_meta_model::WorkerId;
32use risingwave_pb::catalog::CreateType;
33use risingwave_pb::catalog::table::PbTableType;
34use risingwave_pb::common::PbActorInfo;
35use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
36use risingwave_pb::plan_common::PbField;
37use risingwave_pb::source::{
38    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
39};
40use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
41use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
42use risingwave_pb::stream_plan::barrier_mutation::Mutation;
43use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
44use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
45use risingwave_pb::stream_plan::stream_node::NodeBody;
46use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
47use risingwave_pb::stream_plan::update_mutation::*;
48use risingwave_pb::stream_plan::{
49    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
50    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
51    PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StopMutation,
52    SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
53};
54use risingwave_pb::stream_service::BarrierCompleteResponse;
55use tracing::warn;
56
57use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
58use crate::MetaResult;
59use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
60use crate::barrier::cdc_progress::CdcTableBackfillTracker;
61use crate::barrier::edge_builder::FragmentEdgeBuildResult;
62use crate::barrier::info::BarrierInfo;
63use crate::barrier::rpc::ControlStreamManager;
64use crate::barrier::utils::collect_resp_info;
65use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
71    StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
74use crate::stream::{
75    AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
76    SplitState, ThrottleConfig, UpstreamSinkInfo, build_actor_connector_splits,
77};
78
79/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
80/// in some fragment, like scaling or migrating.
81#[derive(Debug, Clone)]
82pub struct Reschedule {
83    /// Added actors in this fragment.
84    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
85
86    /// Removed actors in this fragment.
87    pub removed_actors: HashSet<ActorId>,
88
89    /// Vnode bitmap updates for some actors in this fragment.
90    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
91
92    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
93    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
94    /// New hash mapping of the upstream dispatcher to be updated.
95    ///
96    /// This field exists only when there's upstream fragment and the current fragment is
97    /// hash-sharded.
98    pub upstream_dispatcher_mapping: Option<ActorMapping>,
99
100    /// The downstream fragments of this fragment.
101    pub downstream_fragment_ids: Vec<FragmentId>,
102
103    /// Reassigned splits for source actors.
104    /// It becomes the `actor_splits` in [`UpdateMutation`].
105    /// `Source` and `SourceBackfill` are handled together here.
106    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
107
108    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
109}
110
111/// Replacing an old job with a new one. All actors in the job will be rebuilt.
112///
113/// Current use cases:
114/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
115/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
116///   will replace a table job's plan.
117#[derive(Debug, Clone)]
118pub struct ReplaceStreamJobPlan {
119    pub old_fragments: StreamJobFragments,
120    pub new_fragments: StreamJobFragmentsToCreate,
121    /// Downstream jobs of the replaced job need to update their `Merge` node to
122    /// connect to the new fragment.
123    pub replace_upstream: FragmentReplaceUpstream,
124    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
125    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
126    /// We need to reassign splits for it.
127    ///
128    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
129    /// `backfill_splits`.
130    pub init_split_assignment: SplitAssignment,
131    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
132    pub streaming_job: StreamingJob,
133    /// The temporary dummy job fragments id of new table fragment
134    pub tmp_id: JobId,
135    /// The state table ids to be dropped.
136    pub to_drop_state_table_ids: Vec<TableId>,
137    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
138}
139
140impl ReplaceStreamJobPlan {
141    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
142        let mut fragment_changes = HashMap::new();
143        for (fragment_id, new_fragment) in self
144            .new_fragments
145            .new_fragment_info(&self.init_split_assignment)
146        {
147            let fragment_change = CommandFragmentChanges::NewFragment {
148                job_id: self.streaming_job.id(),
149                info: new_fragment,
150            };
151            fragment_changes
152                .try_insert(fragment_id, fragment_change)
153                .expect("non-duplicate");
154        }
155        for fragment in self.old_fragments.fragments.values() {
156            fragment_changes
157                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
158                .expect("non-duplicate");
159        }
160        for (fragment_id, replace_map) in &self.replace_upstream {
161            fragment_changes
162                .try_insert(
163                    *fragment_id,
164                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
165                )
166                .expect("non-duplicate");
167        }
168        if let Some(sinks) = &self.auto_refresh_schema_sinks {
169            for sink in sinks {
170                let fragment_change = CommandFragmentChanges::NewFragment {
171                    job_id: sink.original_sink.id.as_job_id(),
172                    info: sink.new_fragment_info(),
173                };
174                fragment_changes
175                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
176                    .expect("non-duplicate");
177                fragment_changes
178                    .try_insert(
179                        sink.original_fragment.fragment_id,
180                        CommandFragmentChanges::RemoveFragment,
181                    )
182                    .expect("non-duplicate");
183            }
184        }
185        fragment_changes
186    }
187
188    /// `old_fragment_id` -> `new_fragment_id`
189    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
190        let mut fragment_replacements = HashMap::new();
191        for (upstream_fragment_id, new_upstream_fragment_id) in
192            self.replace_upstream.values().flatten()
193        {
194            {
195                let r =
196                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
197                if let Some(r) = r {
198                    assert_eq!(
199                        *new_upstream_fragment_id, r,
200                        "one fragment is replaced by multiple fragments"
201                    );
202                }
203            }
204        }
205        fragment_replacements
206    }
207}
208
209#[derive(educe::Educe, Clone)]
210#[educe(Debug)]
211pub struct CreateStreamingJobCommandInfo {
212    #[educe(Debug(ignore))]
213    pub stream_job_fragments: StreamJobFragmentsToCreate,
214    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
215    pub init_split_assignment: SplitAssignment,
216    pub definition: String,
217    pub job_type: StreamingJobType,
218    pub create_type: CreateType,
219    pub streaming_job: StreamingJob,
220    pub fragment_backfill_ordering: FragmentBackfillOrder,
221    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
222    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
223}
224
225impl StreamJobFragments {
226    pub(super) fn new_fragment_info<'a>(
227        &'a self,
228        assignment: &'a SplitAssignment,
229    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
230        self.fragments.values().map(|fragment| {
231            let mut fragment_splits = assignment
232                .get(&fragment.fragment_id)
233                .cloned()
234                .unwrap_or_default();
235
236            (
237                fragment.fragment_id,
238                InflightFragmentInfo {
239                    fragment_id: fragment.fragment_id,
240                    distribution_type: fragment.distribution_type.into(),
241                    fragment_type_mask: fragment.fragment_type_mask,
242                    vnode_count: fragment.vnode_count(),
243                    nodes: fragment.nodes.clone(),
244                    actors: fragment
245                        .actors
246                        .iter()
247                        .map(|actor| {
248                            (
249                                actor.actor_id,
250                                InflightActorInfo {
251                                    worker_id: self
252                                        .actor_status
253                                        .get(&actor.actor_id)
254                                        .expect("should exist")
255                                        .worker_id(),
256                                    vnode_bitmap: actor.vnode_bitmap.clone(),
257                                    splits: fragment_splits
258                                        .remove(&actor.actor_id)
259                                        .unwrap_or_default(),
260                                },
261                            )
262                        })
263                        .collect(),
264                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
265                },
266            )
267        })
268    }
269}
270
271#[derive(Debug, Clone)]
272pub struct SnapshotBackfillInfo {
273    /// `table_id` -> `Some(snapshot_backfill_epoch)`
274    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
275    /// by global barrier worker when handling the command.
276    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
277}
278
279#[derive(Debug, Clone)]
280pub enum CreateStreamingJobType {
281    Normal,
282    SinkIntoTable(UpstreamSinkInfo),
283    SnapshotBackfill(SnapshotBackfillInfo),
284}
285
286/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
287/// it will [build different barriers to send](Self::to_mutation),
288/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
289// FIXME: this enum is significantly large on stack, box it
290#[derive(Debug)]
291pub enum Command {
292    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
293    /// all messages before the checkpoint barrier should have been committed.
294    Flush,
295
296    /// `Pause` command generates a `Pause` barrier **only if**
297    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
298    Pause,
299
300    /// `Resume` command generates a `Resume` barrier **only
301    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
302    /// will be generated.
303    Resume,
304
305    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
306    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
307    /// dropped by reference counts before.
308    ///
309    /// Barriers from the actors to be dropped will STILL be collected.
310    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
311    /// drop actors, and then delete the job fragments info from meta store.
312    DropStreamingJobs {
313        streaming_job_ids: HashSet<JobId>,
314        actors: Vec<ActorId>,
315        unregistered_state_table_ids: HashSet<TableId>,
316        unregistered_fragment_ids: HashSet<FragmentId>,
317        // target_fragment -> [sink_fragments]
318        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
319    },
320
321    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
322    ///
323    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
324    /// be collected since the barrier should be passthrough.
325    ///
326    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
327    /// it adds the job fragments info to meta store. However, the creating progress will **last
328    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
329    /// will be set to `Created`.
330    CreateStreamingJob {
331        info: CreateStreamingJobCommandInfo,
332        job_type: CreateStreamingJobType,
333        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
334    },
335
336    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
337    /// Mainly used for scaling and migration.
338    ///
339    /// Barriers from which actors should be collected, and the post behavior of this command are
340    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
341    RescheduleFragment {
342        reschedules: HashMap<FragmentId, Reschedule>,
343        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
344        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
345    },
346
347    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
348    /// essentially switching the downstream of the old job fragments to the new ones, and
349    /// dropping the old job fragments. Used for schema change.
350    ///
351    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
352    /// of the Merge executors are changed additionally.
353    ReplaceStreamJob(ReplaceStreamJobPlan),
354
355    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
356    /// changed splits.
357    SourceChangeSplit(SplitState),
358
359    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
360    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
361    Throttle(ThrottleConfig),
362
363    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
364    /// materialize executor to start storing old value for subscription.
365    CreateSubscription {
366        subscription_id: SubscriptionId,
367        upstream_mv_table_id: TableId,
368        retention_second: u64,
369    },
370
371    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
372    /// materialize executor to stop storing old value when there is no
373    /// subscription depending on it.
374    DropSubscription {
375        subscription_id: SubscriptionId,
376        upstream_mv_table_id: TableId,
377    },
378
379    ConnectorPropsChange(ConnectorPropsChange),
380
381    /// `Refresh` command generates a barrier to refresh a table by truncating state
382    /// and reloading data from source.
383    Refresh {
384        table_id: TableId,
385        associated_source_id: SourceId,
386    },
387    ListFinish {
388        table_id: TableId,
389        associated_source_id: SourceId,
390    },
391    LoadFinish {
392        table_id: TableId,
393        associated_source_id: SourceId,
394    },
395}
396
397// For debugging and observability purposes. Can add more details later if needed.
398impl std::fmt::Display for Command {
399    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
400        match self {
401            Command::Flush => write!(f, "Flush"),
402            Command::Pause => write!(f, "Pause"),
403            Command::Resume => write!(f, "Resume"),
404            Command::DropStreamingJobs {
405                streaming_job_ids, ..
406            } => {
407                write!(
408                    f,
409                    "DropStreamingJobs: {}",
410                    streaming_job_ids.iter().sorted().join(", ")
411                )
412            }
413            Command::CreateStreamingJob { info, .. } => {
414                write!(f, "CreateStreamingJob: {}", info.streaming_job)
415            }
416            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
417            Command::ReplaceStreamJob(plan) => {
418                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
419            }
420            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
421            Command::Throttle(_) => write!(f, "Throttle"),
422            Command::CreateSubscription {
423                subscription_id, ..
424            } => write!(f, "CreateSubscription: {subscription_id}"),
425            Command::DropSubscription {
426                subscription_id, ..
427            } => write!(f, "DropSubscription: {subscription_id}"),
428            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
429            Command::Refresh {
430                table_id,
431                associated_source_id,
432            } => write!(
433                f,
434                "Refresh: {} (source: {})",
435                table_id, associated_source_id
436            ),
437            Command::ListFinish {
438                table_id,
439                associated_source_id,
440            } => write!(
441                f,
442                "ListFinish: {} (source: {})",
443                table_id, associated_source_id
444            ),
445            Command::LoadFinish {
446                table_id,
447                associated_source_id,
448            } => write!(
449                f,
450                "LoadFinish: {} (source: {})",
451                table_id, associated_source_id
452            ),
453        }
454    }
455}
456
457impl Command {
458    pub fn pause() -> Self {
459        Self::Pause
460    }
461
462    pub fn resume() -> Self {
463        Self::Resume
464    }
465
466    #[expect(clippy::type_complexity)]
467    pub(super) fn fragment_changes(
468        &self,
469    ) -> Option<(
470        Option<(JobId, Option<CdcTableBackfillTracker>)>,
471        HashMap<FragmentId, CommandFragmentChanges>,
472    )> {
473        match self {
474            Command::Flush => None,
475            Command::Pause => None,
476            Command::Resume => None,
477            Command::DropStreamingJobs {
478                unregistered_fragment_ids,
479                dropped_sink_fragment_by_targets,
480                ..
481            } => {
482                let changes = unregistered_fragment_ids
483                    .iter()
484                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
485                    .chain(dropped_sink_fragment_by_targets.iter().map(
486                        |(target_fragment, sink_fragments)| {
487                            (
488                                *target_fragment,
489                                CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
490                            )
491                        },
492                    ))
493                    .collect();
494
495                Some((None, changes))
496            }
497            Command::CreateStreamingJob { info, job_type, .. } => {
498                assert!(
499                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
500                    "should handle fragment changes separately for snapshot backfill"
501                );
502                let mut changes: HashMap<_, _> = info
503                    .stream_job_fragments
504                    .new_fragment_info(&info.init_split_assignment)
505                    .map(|(fragment_id, fragment_infos)| {
506                        (
507                            fragment_id,
508                            CommandFragmentChanges::NewFragment {
509                                job_id: info.streaming_job.id(),
510                                info: fragment_infos,
511                            },
512                        )
513                    })
514                    .collect();
515
516                if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
517                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
518                    changes.insert(
519                        downstream_fragment_id,
520                        CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
521                            upstream_fragment_id: ctx.sink_fragment_id,
522                            sink_output_schema: ctx.sink_output_fields.clone(),
523                            project_exprs: ctx.project_exprs.clone(),
524                        }),
525                    );
526                }
527
528                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
529                    let (fragment, _) =
530                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
531                            .expect("should have parallel cdc fragment");
532                    Some(CdcTableBackfillTracker::new(
533                        fragment.fragment_id,
534                        splits.clone(),
535                    ))
536                } else {
537                    None
538                };
539
540                Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
541            }
542            Command::RescheduleFragment { reschedules, .. } => Some((
543                None,
544                reschedules
545                    .iter()
546                    .map(|(fragment_id, reschedule)| {
547                        (
548                            *fragment_id,
549                            CommandFragmentChanges::Reschedule {
550                                new_actors: reschedule
551                                    .added_actors
552                                    .iter()
553                                    .flat_map(|(node_id, actors)| {
554                                        actors.iter().map(|actor_id| {
555                                            (
556                                                *actor_id,
557                                                InflightActorInfo {
558                                                    worker_id: *node_id,
559                                                    vnode_bitmap: reschedule
560                                                        .newly_created_actors
561                                                        .get(actor_id)
562                                                        .expect("should exist")
563                                                        .0
564                                                        .0
565                                                        .vnode_bitmap
566                                                        .clone(),
567                                                    splits: reschedule
568                                                        .actor_splits
569                                                        .get(actor_id)
570                                                        .cloned()
571                                                        .unwrap_or_default(),
572                                                },
573                                            )
574                                        })
575                                    })
576                                    .collect(),
577                                actor_update_vnode_bitmap: reschedule
578                                    .vnode_bitmap_updates
579                                    .iter()
580                                    .filter(|(actor_id, _)| {
581                                        // only keep the existing actors
582                                        !reschedule.newly_created_actors.contains_key(actor_id)
583                                    })
584                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
585                                    .collect(),
586                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
587                                actor_splits: reschedule.actor_splits.clone(),
588                            },
589                        )
590                    })
591                    .collect(),
592            )),
593            Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
594            Command::SourceChangeSplit(SplitState {
595                split_assignment, ..
596            }) => Some((
597                None,
598                split_assignment
599                    .iter()
600                    .map(|(&fragment_id, splits)| {
601                        (
602                            fragment_id,
603                            CommandFragmentChanges::SplitAssignment {
604                                actor_splits: splits.clone(),
605                            },
606                        )
607                    })
608                    .collect(),
609            )),
610            Command::Throttle(_) => None,
611            Command::CreateSubscription { .. } => None,
612            Command::DropSubscription { .. } => None,
613            Command::ConnectorPropsChange(_) => None,
614            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
615            Command::ListFinish { .. } => None, // ListFinish doesn't change fragment structure
616            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
617        }
618    }
619
620    pub fn need_checkpoint(&self) -> bool {
621        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
622        !matches!(self, Command::Resume)
623    }
624}
625
626#[derive(Debug, Clone)]
627pub enum BarrierKind {
628    Initial,
629    Barrier,
630    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
631    Checkpoint(Vec<u64>),
632}
633
634impl BarrierKind {
635    pub fn to_protobuf(&self) -> PbBarrierKind {
636        match self {
637            BarrierKind::Initial => PbBarrierKind::Initial,
638            BarrierKind::Barrier => PbBarrierKind::Barrier,
639            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
640        }
641    }
642
643    pub fn is_checkpoint(&self) -> bool {
644        matches!(self, BarrierKind::Checkpoint(_))
645    }
646
647    pub fn is_initial(&self) -> bool {
648        matches!(self, BarrierKind::Initial)
649    }
650
651    pub fn as_str_name(&self) -> &'static str {
652        match self {
653            BarrierKind::Initial => "Initial",
654            BarrierKind::Barrier => "Barrier",
655            BarrierKind::Checkpoint(_) => "Checkpoint",
656        }
657    }
658}
659
660/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
661/// [`Command`].
662pub(super) struct CommandContext {
663    mv_subscription_max_retention: HashMap<TableId, u64>,
664
665    pub(super) barrier_info: BarrierInfo,
666
667    pub(super) table_ids_to_commit: HashSet<TableId>,
668
669    pub(super) command: Option<Command>,
670
671    /// The tracing span of this command.
672    ///
673    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
674    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
675    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
676    _span: tracing::Span,
677}
678
679impl std::fmt::Debug for CommandContext {
680    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
681        f.debug_struct("CommandContext")
682            .field("barrier_info", &self.barrier_info)
683            .field("command", &self.command)
684            .finish()
685    }
686}
687
688impl std::fmt::Display for CommandContext {
689    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
690        write!(
691            f,
692            "prev_epoch={}, curr_epoch={}, kind={}",
693            self.barrier_info.prev_epoch(),
694            self.barrier_info.curr_epoch(),
695            self.barrier_info.kind.as_str_name()
696        )?;
697        if let Some(command) = &self.command {
698            write!(f, ", command={}", command)?;
699        }
700        Ok(())
701    }
702}
703
704impl CommandContext {
705    pub(super) fn new(
706        barrier_info: BarrierInfo,
707        mv_subscription_max_retention: HashMap<TableId, u64>,
708        table_ids_to_commit: HashSet<TableId>,
709        command: Option<Command>,
710        span: tracing::Span,
711    ) -> Self {
712        Self {
713            mv_subscription_max_retention,
714            barrier_info,
715            table_ids_to_commit,
716            command,
717            _span: span,
718        }
719    }
720
721    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
722        let Some(truncate_timestamptz) = Timestamptz::from_secs(
723            self.barrier_info
724                .prev_epoch
725                .value()
726                .as_timestamptz()
727                .timestamp()
728                - retention_second as i64,
729        ) else {
730            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
731            return self.barrier_info.prev_epoch.value();
732        };
733        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
734    }
735
736    pub(super) fn collect_commit_epoch_info(
737        &self,
738        info: &mut CommitEpochInfo,
739        resps: Vec<BarrierCompleteResponse>,
740        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
741    ) {
742        let (
743            sst_to_context,
744            synced_ssts,
745            new_table_watermarks,
746            old_value_ssts,
747            vector_index_adds,
748            truncate_tables,
749        ) = collect_resp_info(resps);
750
751        let new_table_fragment_infos =
752            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
753                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
754            {
755                let table_fragments = &info.stream_job_fragments;
756                let mut table_ids: HashSet<_> =
757                    table_fragments.internal_table_ids().into_iter().collect();
758                if let Some(mv_table_id) = table_fragments.mv_table_id() {
759                    table_ids.insert(mv_table_id);
760                }
761
762                vec![NewTableFragmentInfo { table_ids }]
763            } else {
764                vec![]
765            };
766
767        let mut mv_log_store_truncate_epoch = HashMap::new();
768        // TODO: may collect cross db snapshot backfill
769        let mut update_truncate_epoch =
770            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
771                Entry::Occupied(mut entry) => {
772                    let prev_truncate_epoch = entry.get_mut();
773                    if truncate_epoch < *prev_truncate_epoch {
774                        *prev_truncate_epoch = truncate_epoch;
775                    }
776                }
777                Entry::Vacant(entry) => {
778                    entry.insert(truncate_epoch);
779                }
780            };
781        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
782            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
783            update_truncate_epoch(*mv_table_id, truncate_epoch);
784        }
785        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
786            for mv_table_id in upstream_mv_table_ids {
787                update_truncate_epoch(mv_table_id, backfill_epoch);
788            }
789        }
790
791        let table_new_change_log = build_table_change_log_delta(
792            old_value_ssts.into_iter(),
793            synced_ssts.iter().map(|sst| &sst.sst_info),
794            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
795            mv_log_store_truncate_epoch.into_iter(),
796        );
797
798        let epoch = self.barrier_info.prev_epoch();
799        for table_id in &self.table_ids_to_commit {
800            info.tables_to_commit
801                .try_insert(*table_id, epoch)
802                .expect("non duplicate");
803        }
804
805        info.sstables.extend(synced_ssts);
806        info.new_table_watermarks.extend(new_table_watermarks);
807        info.sst_to_context.extend(sst_to_context);
808        info.new_table_fragment_infos
809            .extend(new_table_fragment_infos);
810        info.change_log_delta.extend(table_new_change_log);
811        for (table_id, vector_index_adds) in vector_index_adds {
812            info.vector_index_delta
813                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
814                .expect("non-duplicate");
815        }
816        if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
817            for fragment in job_info.stream_job_fragments.fragments.values() {
818                visit_stream_node_cont(&fragment.nodes, |node| {
819                    match node.node_body.as_ref().unwrap() {
820                        NodeBody::VectorIndexWrite(vector_index_write) => {
821                            let index_table = vector_index_write.table.as_ref().unwrap();
822                            assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
823                            info.vector_index_delta
824                                .try_insert(
825                                    index_table.id,
826                                    VectorIndexDelta::Init(PbVectorIndexInit {
827                                        info: Some(index_table.vector_index_info.unwrap()),
828                                    }),
829                                )
830                                .expect("non-duplicate");
831                            false
832                        }
833                        _ => true,
834                    }
835                })
836            }
837        }
838        info.truncate_tables.extend(truncate_tables);
839    }
840}
841
842impl Command {
843    /// Generate a mutation for the given command.
844    ///
845    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
846    pub(super) fn to_mutation(
847        &self,
848        is_currently_paused: bool,
849        edges: &mut Option<FragmentEdgeBuildResult>,
850        control_stream_manager: &ControlStreamManager,
851        database_info: &mut InflightDatabaseInfo,
852    ) -> MetaResult<Option<Mutation>> {
853        let mutation = match self {
854            Command::Flush => None,
855
856            Command::Pause => {
857                // Only pause when the cluster is not already paused.
858                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
859                if !is_currently_paused {
860                    Some(Mutation::Pause(PauseMutation {}))
861                } else {
862                    None
863                }
864            }
865
866            Command::Resume => {
867                // Only resume when the cluster is paused with the same reason.
868                if is_currently_paused {
869                    Some(Mutation::Resume(ResumeMutation {}))
870                } else {
871                    None
872                }
873            }
874
875            Command::SourceChangeSplit(SplitState {
876                split_assignment, ..
877            }) => {
878                let mut diff = HashMap::new();
879
880                for actor_splits in split_assignment.values() {
881                    diff.extend(actor_splits.clone());
882                }
883
884                Some(Mutation::Splits(SourceChangeSplitMutation {
885                    actor_splits: build_actor_connector_splits(&diff),
886                }))
887            }
888
889            Command::Throttle(config) => {
890                let mut actor_to_apply = HashMap::new();
891                for per_fragment in config.values() {
892                    actor_to_apply.extend(
893                        per_fragment
894                            .iter()
895                            .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
896                    );
897                }
898
899                Some(Mutation::Throttle(ThrottleMutation {
900                    actor_throttle: actor_to_apply,
901                }))
902            }
903
904            Command::DropStreamingJobs {
905                actors,
906                dropped_sink_fragment_by_targets,
907                ..
908            } => Some(Mutation::Stop(StopMutation {
909                actors: actors.clone(),
910                dropped_sink_fragments: dropped_sink_fragment_by_targets
911                    .values()
912                    .flatten()
913                    .cloned()
914                    .collect(),
915            })),
916
917            Command::CreateStreamingJob {
918                info:
919                    CreateStreamingJobCommandInfo {
920                        stream_job_fragments,
921                        init_split_assignment: split_assignment,
922                        upstream_fragment_downstreams,
923                        fragment_backfill_ordering,
924                        ..
925                    },
926                job_type,
927                ..
928            } => {
929                let edges = edges.as_mut().expect("should exist");
930                let added_actors = stream_job_fragments.actor_ids().collect();
931                let actor_splits = split_assignment
932                    .values()
933                    .flat_map(build_actor_connector_splits)
934                    .collect();
935                let subscriptions_to_add =
936                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
937                        job_type
938                    {
939                        snapshot_backfill_info
940                            .upstream_mv_table_id_to_backfill_epoch
941                            .keys()
942                            .map(|table_id| SubscriptionUpstreamInfo {
943                                subscriber_id: stream_job_fragments
944                                    .stream_job_id()
945                                    .as_subscriber_id(),
946                                upstream_mv_table_id: *table_id,
947                            })
948                            .collect()
949                    } else {
950                        Default::default()
951                    };
952                let backfill_nodes_to_pause: Vec<_> =
953                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
954                        .into_iter()
955                        .collect();
956
957                let new_upstream_sinks =
958                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
959                        sink_fragment_id,
960                        sink_output_fields,
961                        project_exprs,
962                        new_sink_downstream,
963                        ..
964                    }) = job_type
965                    {
966                        let new_sink_actors = stream_job_fragments
967                            .actors_to_create()
968                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
969                            .exactly_one()
970                            .map(|(_, _, actors)| {
971                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
972                                    actor_id: actor.actor_id,
973                                    host: Some(control_stream_manager.host_addr(worker_id)),
974                                })
975                            })
976                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
977                        let new_upstream_sink = PbNewUpstreamSink {
978                            info: Some(PbUpstreamSinkInfo {
979                                upstream_fragment_id: *sink_fragment_id,
980                                sink_output_schema: sink_output_fields.clone(),
981                                project_exprs: project_exprs.clone(),
982                            }),
983                            upstream_actors: new_sink_actors.collect(),
984                        };
985                        HashMap::from([(
986                            new_sink_downstream.downstream_fragment_id,
987                            new_upstream_sink,
988                        )])
989                    } else {
990                        HashMap::new()
991                    };
992
993                let actor_cdc_table_snapshot_splits =
994                    if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
995                        database_info
996                            .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
997                            .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
998                    } else {
999                        None
1000                    };
1001
1002                let add_mutation = AddMutation {
1003                    actor_dispatchers: edges
1004                        .dispatchers
1005                        .extract_if(|fragment_id, _| {
1006                            upstream_fragment_downstreams.contains_key(fragment_id)
1007                        })
1008                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1009                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1010                        .collect(),
1011                    added_actors,
1012                    actor_splits,
1013                    // If the cluster is already paused, the new actors should be paused too.
1014                    pause: is_currently_paused,
1015                    subscriptions_to_add,
1016                    backfill_nodes_to_pause,
1017                    actor_cdc_table_snapshot_splits,
1018                    new_upstream_sinks,
1019                };
1020
1021                Some(Mutation::Add(add_mutation))
1022            }
1023
1024            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1025                old_fragments,
1026                replace_upstream,
1027                upstream_fragment_downstreams,
1028                init_split_assignment,
1029                auto_refresh_schema_sinks,
1030                ..
1031            }) => {
1032                let edges = edges.as_mut().expect("should exist");
1033                let merge_updates = edges
1034                    .merge_updates
1035                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1036                    .collect();
1037                let dispatchers = edges
1038                    .dispatchers
1039                    .extract_if(|fragment_id, _| {
1040                        upstream_fragment_downstreams.contains_key(fragment_id)
1041                    })
1042                    .collect();
1043                let actor_cdc_table_snapshot_splits = database_info
1044                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1045                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1046                Self::generate_update_mutation_for_replace_table(
1047                    old_fragments.actor_ids().chain(
1048                        auto_refresh_schema_sinks
1049                            .as_ref()
1050                            .into_iter()
1051                            .flat_map(|sinks| {
1052                                sinks.iter().flat_map(|sink| {
1053                                    sink.original_fragment
1054                                        .actors
1055                                        .iter()
1056                                        .map(|actor| actor.actor_id)
1057                                })
1058                            }),
1059                    ),
1060                    merge_updates,
1061                    dispatchers,
1062                    init_split_assignment,
1063                    actor_cdc_table_snapshot_splits,
1064                    auto_refresh_schema_sinks.as_ref(),
1065                )
1066            }
1067
1068            Command::RescheduleFragment {
1069                reschedules,
1070                fragment_actors,
1071                ..
1072            } => {
1073                let mut dispatcher_update = HashMap::new();
1074                for reschedule in reschedules.values() {
1075                    for &(upstream_fragment_id, dispatcher_id) in
1076                        &reschedule.upstream_fragment_dispatcher_ids
1077                    {
1078                        // Find the actors of the upstream fragment.
1079                        let upstream_actor_ids = fragment_actors
1080                            .get(&upstream_fragment_id)
1081                            .expect("should contain");
1082
1083                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1084
1085                        // Record updates for all actors.
1086                        for &actor_id in upstream_actor_ids {
1087                            let added_downstream_actor_id = if upstream_reschedule
1088                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1089                                .unwrap_or(true)
1090                            {
1091                                reschedule
1092                                    .added_actors
1093                                    .values()
1094                                    .flatten()
1095                                    .cloned()
1096                                    .collect()
1097                            } else {
1098                                Default::default()
1099                            };
1100                            // Index with the dispatcher id to check duplicates.
1101                            dispatcher_update
1102                                .try_insert(
1103                                    (actor_id, dispatcher_id),
1104                                    DispatcherUpdate {
1105                                        actor_id,
1106                                        dispatcher_id,
1107                                        hash_mapping: reschedule
1108                                            .upstream_dispatcher_mapping
1109                                            .as_ref()
1110                                            .map(|m| m.to_protobuf()),
1111                                        added_downstream_actor_id,
1112                                        removed_downstream_actor_id: reschedule
1113                                            .removed_actors
1114                                            .iter()
1115                                            .cloned()
1116                                            .collect(),
1117                                    },
1118                                )
1119                                .unwrap();
1120                        }
1121                    }
1122                }
1123                let dispatcher_update = dispatcher_update.into_values().collect();
1124
1125                let mut merge_update = HashMap::new();
1126                for (&fragment_id, reschedule) in reschedules {
1127                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1128                        // Find the actors of the downstream fragment.
1129                        let downstream_actor_ids = fragment_actors
1130                            .get(&downstream_fragment_id)
1131                            .expect("should contain");
1132
1133                        // Downstream removed actors should be skipped
1134                        // Newly created actors of the current fragment will not dispatch Update
1135                        // barriers to them
1136                        let downstream_removed_actors: HashSet<_> = reschedules
1137                            .get(&downstream_fragment_id)
1138                            .map(|downstream_reschedule| {
1139                                downstream_reschedule
1140                                    .removed_actors
1141                                    .iter()
1142                                    .copied()
1143                                    .collect()
1144                            })
1145                            .unwrap_or_default();
1146
1147                        // Record updates for all actors.
1148                        for &actor_id in downstream_actor_ids {
1149                            if downstream_removed_actors.contains(&actor_id) {
1150                                continue;
1151                            }
1152
1153                            // Index with the fragment id to check duplicates.
1154                            merge_update
1155                                .try_insert(
1156                                    (actor_id, fragment_id),
1157                                    MergeUpdate {
1158                                        actor_id,
1159                                        upstream_fragment_id: fragment_id,
1160                                        new_upstream_fragment_id: None,
1161                                        added_upstream_actors: reschedule
1162                                            .added_actors
1163                                            .iter()
1164                                            .flat_map(|(worker_id, actors)| {
1165                                                let host =
1166                                                    control_stream_manager.host_addr(*worker_id);
1167                                                actors.iter().map(move |&actor_id| PbActorInfo {
1168                                                    actor_id,
1169                                                    host: Some(host.clone()),
1170                                                })
1171                                            })
1172                                            .collect(),
1173                                        removed_upstream_actor_id: reschedule
1174                                            .removed_actors
1175                                            .iter()
1176                                            .cloned()
1177                                            .collect(),
1178                                    },
1179                                )
1180                                .unwrap();
1181                        }
1182                    }
1183                }
1184                let merge_update = merge_update.into_values().collect();
1185
1186                let mut actor_vnode_bitmap_update = HashMap::new();
1187                for reschedule in reschedules.values() {
1188                    // Record updates for all actors in this fragment.
1189                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1190                        let bitmap = bitmap.to_protobuf();
1191                        actor_vnode_bitmap_update
1192                            .try_insert(actor_id, bitmap)
1193                            .unwrap();
1194                    }
1195                }
1196                let dropped_actors = reschedules
1197                    .values()
1198                    .flat_map(|r| r.removed_actors.iter().copied())
1199                    .collect();
1200                let mut actor_splits = HashMap::new();
1201                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1202                for (fragment_id, reschedule) in reschedules {
1203                    for (actor_id, splits) in &reschedule.actor_splits {
1204                        actor_splits.insert(
1205                            *actor_id,
1206                            ConnectorSplits {
1207                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1208                            },
1209                        );
1210                    }
1211
1212                    if let Some(assignment) =
1213                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1214                    {
1215                        actor_cdc_table_snapshot_splits.extend(assignment)
1216                    }
1217                }
1218
1219                // we don't create dispatchers in reschedule scenario
1220                let actor_new_dispatchers = HashMap::new();
1221                let mutation = Mutation::Update(UpdateMutation {
1222                    dispatcher_update,
1223                    merge_update,
1224                    actor_vnode_bitmap_update,
1225                    dropped_actors,
1226                    actor_splits,
1227                    actor_new_dispatchers,
1228                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1229                        splits: actor_cdc_table_snapshot_splits,
1230                    }),
1231                    sink_schema_change: Default::default(),
1232                    subscriptions_to_drop: vec![],
1233                });
1234                tracing::debug!("update mutation: {mutation:?}");
1235                Some(mutation)
1236            }
1237
1238            Command::CreateSubscription {
1239                upstream_mv_table_id,
1240                subscription_id,
1241                ..
1242            } => Some(Mutation::Add(AddMutation {
1243                actor_dispatchers: Default::default(),
1244                added_actors: vec![],
1245                actor_splits: Default::default(),
1246                pause: false,
1247                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1248                    upstream_mv_table_id: *upstream_mv_table_id,
1249                    subscriber_id: subscription_id.as_subscriber_id(),
1250                }],
1251                backfill_nodes_to_pause: vec![],
1252                actor_cdc_table_snapshot_splits: None,
1253                new_upstream_sinks: Default::default(),
1254            })),
1255            Command::DropSubscription {
1256                upstream_mv_table_id,
1257                subscription_id,
1258            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1259                info: vec![SubscriptionUpstreamInfo {
1260                    subscriber_id: subscription_id.as_subscriber_id(),
1261                    upstream_mv_table_id: *upstream_mv_table_id,
1262                }],
1263            })),
1264            Command::ConnectorPropsChange(config) => {
1265                let mut connector_props_infos = HashMap::default();
1266                for (k, v) in config {
1267                    connector_props_infos.insert(
1268                        k.as_raw_id(),
1269                        ConnectorPropsInfo {
1270                            connector_props_info: v.clone(),
1271                        },
1272                    );
1273                }
1274                Some(Mutation::ConnectorPropsChange(
1275                    ConnectorPropsChangeMutation {
1276                        connector_props_infos,
1277                    },
1278                ))
1279            }
1280            Command::Refresh {
1281                table_id,
1282                associated_source_id,
1283            } => Some(Mutation::RefreshStart(
1284                risingwave_pb::stream_plan::RefreshStartMutation {
1285                    table_id: *table_id,
1286                    associated_source_id: *associated_source_id,
1287                },
1288            )),
1289            Command::ListFinish {
1290                table_id: _,
1291                associated_source_id,
1292            } => Some(Mutation::ListFinish(ListFinishMutation {
1293                associated_source_id: *associated_source_id,
1294            })),
1295            Command::LoadFinish {
1296                table_id: _,
1297                associated_source_id,
1298            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1299                associated_source_id: *associated_source_id,
1300            })),
1301        };
1302        Ok(mutation)
1303    }
1304
1305    pub(super) fn actors_to_create(
1306        &self,
1307        database_info: &InflightDatabaseInfo,
1308        edges: &mut Option<FragmentEdgeBuildResult>,
1309        control_stream_manager: &ControlStreamManager,
1310    ) -> Option<StreamJobActorsToCreate> {
1311        match self {
1312            Command::CreateStreamingJob { info, job_type, .. } => {
1313                if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1314                    // for snapshot backfill, the actors to create is measured separately
1315                    return None;
1316                }
1317                let actors_to_create = info.stream_job_fragments.actors_to_create();
1318                let edges = edges.as_mut().expect("should exist");
1319                Some(edges.collect_actors_to_create(actors_to_create.map(
1320                    |(fragment_id, node, actors)| {
1321                        (
1322                            fragment_id,
1323                            node,
1324                            actors,
1325                            [], // no subscriber for new job to create
1326                        )
1327                    },
1328                )))
1329            }
1330            Command::RescheduleFragment {
1331                reschedules,
1332                fragment_actors,
1333                ..
1334            } => {
1335                let mut actor_upstreams = Self::collect_actor_upstreams(
1336                    reschedules.iter().map(|(fragment_id, reschedule)| {
1337                        (
1338                            *fragment_id,
1339                            reschedule.newly_created_actors.values().map(
1340                                |((actor, dispatchers), _)| {
1341                                    (actor.actor_id, dispatchers.as_slice())
1342                                },
1343                            ),
1344                        )
1345                    }),
1346                    Some((reschedules, fragment_actors)),
1347                    database_info,
1348                    control_stream_manager,
1349                );
1350                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1351                for (fragment_id, (actor, dispatchers), worker_id) in
1352                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1353                        reschedule
1354                            .newly_created_actors
1355                            .values()
1356                            .map(|(actors, status)| (*fragment_id, actors, status))
1357                    })
1358                {
1359                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1360                    map.entry(*worker_id)
1361                        .or_default()
1362                        .entry(fragment_id)
1363                        .or_insert_with(|| {
1364                            let node = database_info.fragment(fragment_id).nodes.clone();
1365                            let subscribers =
1366                                database_info.fragment_subscribers(fragment_id).collect();
1367                            (node, vec![], subscribers)
1368                        })
1369                        .1
1370                        .push((actor.clone(), upstreams, dispatchers.clone()));
1371                }
1372                Some(map)
1373            }
1374            Command::ReplaceStreamJob(replace_table) => {
1375                let edges = edges.as_mut().expect("should exist");
1376                let mut actors = edges.collect_actors_to_create(
1377                    replace_table.new_fragments.actors_to_create().map(
1378                        |(fragment_id, node, actors)| {
1379                            (
1380                                fragment_id,
1381                                node,
1382                                actors,
1383                                database_info
1384                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1385                            )
1386                        },
1387                    ),
1388                );
1389                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1390                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1391                        (
1392                            sink.new_fragment.fragment_id,
1393                            &sink.new_fragment.nodes,
1394                            sink.new_fragment.actors.iter().map(|actor| {
1395                                (
1396                                    actor,
1397                                    sink.actor_status[&actor.actor_id]
1398                                        .location
1399                                        .as_ref()
1400                                        .unwrap()
1401                                        .worker_node_id,
1402                                )
1403                            }),
1404                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1405                        )
1406                    }));
1407                    for (worker_id, fragment_actors) in sink_actors {
1408                        actors.entry(worker_id).or_default().extend(fragment_actors);
1409                    }
1410                }
1411                Some(actors)
1412            }
1413            _ => None,
1414        }
1415    }
1416
1417    fn generate_update_mutation_for_replace_table(
1418        dropped_actors: impl IntoIterator<Item = ActorId>,
1419        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1420        dispatchers: FragmentActorDispatchers,
1421        init_split_assignment: &SplitAssignment,
1422        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1423        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1424    ) -> Option<Mutation> {
1425        let dropped_actors = dropped_actors.into_iter().collect();
1426
1427        let actor_new_dispatchers = dispatchers
1428            .into_values()
1429            .flatten()
1430            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1431            .collect();
1432
1433        let actor_splits = init_split_assignment
1434            .values()
1435            .flat_map(build_actor_connector_splits)
1436            .collect();
1437        Some(Mutation::Update(UpdateMutation {
1438            actor_new_dispatchers,
1439            merge_update: merge_updates.into_values().flatten().collect(),
1440            dropped_actors,
1441            actor_splits,
1442            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1443            sink_schema_change: auto_refresh_schema_sinks
1444                .as_ref()
1445                .into_iter()
1446                .flat_map(|sinks| {
1447                    sinks.iter().map(|sink| {
1448                        (
1449                            sink.original_sink.id.as_raw_id(),
1450                            PbSinkSchemaChange {
1451                                original_schema: sink
1452                                    .original_sink
1453                                    .columns
1454                                    .iter()
1455                                    .map(|col| PbField {
1456                                        data_type: Some(
1457                                            col.column_desc
1458                                                .as_ref()
1459                                                .unwrap()
1460                                                .column_type
1461                                                .as_ref()
1462                                                .unwrap()
1463                                                .clone(),
1464                                        ),
1465                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1466                                    })
1467                                    .collect(),
1468                                op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1469                                    fields: sink
1470                                        .newly_add_fields
1471                                        .iter()
1472                                        .map(|field| field.to_prost())
1473                                        .collect(),
1474                                })),
1475                            },
1476                        )
1477                    })
1478                })
1479                .collect(),
1480            ..Default::default()
1481        }))
1482    }
1483
1484    /// For `CancelStreamingJob`, returns the table id of the target table.
1485    pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_ {
1486        match self {
1487            Command::DropStreamingJobs {
1488                streaming_job_ids, ..
1489            } => Some(streaming_job_ids.iter().cloned()),
1490            _ => None,
1491        }
1492        .into_iter()
1493        .flatten()
1494    }
1495}
1496
1497impl Command {
1498    #[expect(clippy::type_complexity)]
1499    pub(super) fn collect_actor_upstreams(
1500        actor_dispatchers: impl Iterator<
1501            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1502        >,
1503        reschedule_dispatcher_update: Option<(
1504            &HashMap<FragmentId, Reschedule>,
1505            &HashMap<FragmentId, HashSet<ActorId>>,
1506        )>,
1507        database_info: &InflightDatabaseInfo,
1508        control_stream_manager: &ControlStreamManager,
1509    ) -> HashMap<ActorId, ActorUpstreams> {
1510        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1511        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1512            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1513            for (upstream_actor_id, dispatchers) in upstream_actors {
1514                let upstream_actor_location =
1515                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1516                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1517                for downstream_actor_id in dispatchers
1518                    .iter()
1519                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1520                {
1521                    actor_upstreams
1522                        .entry(*downstream_actor_id)
1523                        .or_default()
1524                        .entry(upstream_fragment_id)
1525                        .or_default()
1526                        .insert(
1527                            upstream_actor_id,
1528                            PbActorInfo {
1529                                actor_id: upstream_actor_id,
1530                                host: Some(upstream_actor_host.clone()),
1531                            },
1532                        );
1533                }
1534            }
1535        }
1536        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1537            for reschedule in reschedules.values() {
1538                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1539                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1540                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1541                    for upstream_actor_id in fragment_actors
1542                        .get(upstream_fragment_id)
1543                        .expect("should exist")
1544                    {
1545                        let upstream_actor_location =
1546                            upstream_fragment.actors[upstream_actor_id].worker_id;
1547                        let upstream_actor_host =
1548                            control_stream_manager.host_addr(upstream_actor_location);
1549                        if let Some(upstream_reschedule) = upstream_reschedule
1550                            && upstream_reschedule
1551                                .removed_actors
1552                                .contains(upstream_actor_id)
1553                        {
1554                            continue;
1555                        }
1556                        for (_, downstream_actor_id) in
1557                            reschedule
1558                                .added_actors
1559                                .iter()
1560                                .flat_map(|(worker_id, actors)| {
1561                                    actors.iter().map(|actor| (*worker_id, *actor))
1562                                })
1563                        {
1564                            actor_upstreams
1565                                .entry(downstream_actor_id)
1566                                .or_default()
1567                                .entry(*upstream_fragment_id)
1568                                .or_default()
1569                                .insert(
1570                                    *upstream_actor_id,
1571                                    PbActorInfo {
1572                                        actor_id: *upstream_actor_id,
1573                                        host: Some(upstream_actor_host.clone()),
1574                                    },
1575                                );
1576                        }
1577                    }
1578                }
1579            }
1580        }
1581        actor_upstreams
1582    }
1583}