risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
29use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_meta_model::WorkerId;
32use risingwave_pb::catalog::CreateType;
33use risingwave_pb::catalog::table::PbTableType;
34use risingwave_pb::common::PbActorInfo;
35use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
36use risingwave_pb::plan_common::PbField;
37use risingwave_pb::source::{
38    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
39};
40use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
41use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
42use risingwave_pb::stream_plan::barrier_mutation::Mutation;
43use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
44use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
45use risingwave_pb::stream_plan::stream_node::NodeBody;
46use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
47use risingwave_pb::stream_plan::update_mutation::*;
48use risingwave_pb::stream_plan::{
49    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
50    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
51    PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StopMutation,
52    SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
53};
54use risingwave_pb::stream_service::BarrierCompleteResponse;
55use tracing::warn;
56
57use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
58use crate::MetaResult;
59use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
60use crate::barrier::cdc_progress::CdcTableBackfillTracker;
61use crate::barrier::edge_builder::FragmentEdgeBuildResult;
62use crate::barrier::info::BarrierInfo;
63use crate::barrier::rpc::ControlStreamManager;
64use crate::barrier::utils::collect_resp_info;
65use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
71    StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
74use crate::stream::{
75    AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
76    SplitState, UpstreamSinkInfo, build_actor_connector_splits,
77};
78
79/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
80/// in some fragment, like scaling or migrating.
81#[derive(Debug, Clone)]
82pub struct Reschedule {
83    /// Added actors in this fragment.
84    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
85
86    /// Removed actors in this fragment.
87    pub removed_actors: HashSet<ActorId>,
88
89    /// Vnode bitmap updates for some actors in this fragment.
90    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
91
92    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
93    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
94    /// New hash mapping of the upstream dispatcher to be updated.
95    ///
96    /// This field exists only when there's upstream fragment and the current fragment is
97    /// hash-sharded.
98    pub upstream_dispatcher_mapping: Option<ActorMapping>,
99
100    /// The downstream fragments of this fragment.
101    pub downstream_fragment_ids: Vec<FragmentId>,
102
103    /// Reassigned splits for source actors.
104    /// It becomes the `actor_splits` in [`UpdateMutation`].
105    /// `Source` and `SourceBackfill` are handled together here.
106    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
107
108    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
109}
110
111/// Replacing an old job with a new one. All actors in the job will be rebuilt.
112///
113/// Current use cases:
114/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
115/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
116///   will replace a table job's plan.
117#[derive(Debug, Clone)]
118pub struct ReplaceStreamJobPlan {
119    pub old_fragments: StreamJobFragments,
120    pub new_fragments: StreamJobFragmentsToCreate,
121    /// Downstream jobs of the replaced job need to update their `Merge` node to
122    /// connect to the new fragment.
123    pub replace_upstream: FragmentReplaceUpstream,
124    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
125    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
126    /// We need to reassign splits for it.
127    ///
128    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
129    /// `backfill_splits`.
130    pub init_split_assignment: SplitAssignment,
131    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
132    pub streaming_job: StreamingJob,
133    /// The temporary dummy job fragments id of new table fragment
134    pub tmp_id: JobId,
135    /// The state table ids to be dropped.
136    pub to_drop_state_table_ids: Vec<TableId>,
137    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
138}
139
140impl ReplaceStreamJobPlan {
141    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
142        let mut fragment_changes = HashMap::new();
143        for (fragment_id, new_fragment) in self
144            .new_fragments
145            .new_fragment_info(&self.init_split_assignment)
146        {
147            let fragment_change = CommandFragmentChanges::NewFragment {
148                job_id: self.streaming_job.id(),
149                info: new_fragment,
150            };
151            fragment_changes
152                .try_insert(fragment_id, fragment_change)
153                .expect("non-duplicate");
154        }
155        for fragment in self.old_fragments.fragments.values() {
156            fragment_changes
157                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
158                .expect("non-duplicate");
159        }
160        for (fragment_id, replace_map) in &self.replace_upstream {
161            fragment_changes
162                .try_insert(
163                    *fragment_id,
164                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
165                )
166                .expect("non-duplicate");
167        }
168        if let Some(sinks) = &self.auto_refresh_schema_sinks {
169            for sink in sinks {
170                let fragment_change = CommandFragmentChanges::NewFragment {
171                    job_id: sink.original_sink.id.as_job_id(),
172                    info: sink.new_fragment_info(),
173                };
174                fragment_changes
175                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
176                    .expect("non-duplicate");
177                fragment_changes
178                    .try_insert(
179                        sink.original_fragment.fragment_id,
180                        CommandFragmentChanges::RemoveFragment,
181                    )
182                    .expect("non-duplicate");
183            }
184        }
185        fragment_changes
186    }
187
188    /// `old_fragment_id` -> `new_fragment_id`
189    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
190        let mut fragment_replacements = HashMap::new();
191        for (upstream_fragment_id, new_upstream_fragment_id) in
192            self.replace_upstream.values().flatten()
193        {
194            {
195                let r =
196                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
197                if let Some(r) = r {
198                    assert_eq!(
199                        *new_upstream_fragment_id, r,
200                        "one fragment is replaced by multiple fragments"
201                    );
202                }
203            }
204        }
205        fragment_replacements
206    }
207}
208
209#[derive(educe::Educe, Clone)]
210#[educe(Debug)]
211pub struct CreateStreamingJobCommandInfo {
212    #[educe(Debug(ignore))]
213    pub stream_job_fragments: StreamJobFragmentsToCreate,
214    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
215    pub init_split_assignment: SplitAssignment,
216    pub definition: String,
217    pub job_type: StreamingJobType,
218    pub create_type: CreateType,
219    pub streaming_job: StreamingJob,
220    pub fragment_backfill_ordering: FragmentBackfillOrder,
221    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
222    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
223}
224
225impl StreamJobFragments {
226    pub(super) fn new_fragment_info<'a>(
227        &'a self,
228        assignment: &'a SplitAssignment,
229    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
230        self.fragments.values().map(|fragment| {
231            let mut fragment_splits = assignment
232                .get(&fragment.fragment_id)
233                .cloned()
234                .unwrap_or_default();
235
236            (
237                fragment.fragment_id,
238                InflightFragmentInfo {
239                    fragment_id: fragment.fragment_id,
240                    distribution_type: fragment.distribution_type.into(),
241                    fragment_type_mask: fragment.fragment_type_mask,
242                    vnode_count: fragment.vnode_count(),
243                    nodes: fragment.nodes.clone(),
244                    actors: fragment
245                        .actors
246                        .iter()
247                        .map(|actor| {
248                            (
249                                actor.actor_id,
250                                InflightActorInfo {
251                                    worker_id: self
252                                        .actor_status
253                                        .get(&actor.actor_id)
254                                        .expect("should exist")
255                                        .worker_id(),
256                                    vnode_bitmap: actor.vnode_bitmap.clone(),
257                                    splits: fragment_splits
258                                        .remove(&actor.actor_id)
259                                        .unwrap_or_default(),
260                                },
261                            )
262                        })
263                        .collect(),
264                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
265                },
266            )
267        })
268    }
269}
270
271#[derive(Debug, Clone)]
272pub struct SnapshotBackfillInfo {
273    /// `table_id` -> `Some(snapshot_backfill_epoch)`
274    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
275    /// by global barrier worker when handling the command.
276    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
277}
278
279#[derive(Debug, Clone)]
280pub enum CreateStreamingJobType {
281    Normal,
282    SinkIntoTable(UpstreamSinkInfo),
283    SnapshotBackfill(SnapshotBackfillInfo),
284}
285
286/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
287/// it will [build different barriers to send](Self::to_mutation),
288/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
289// FIXME: this enum is significantly large on stack, box it
290#[derive(Debug)]
291pub enum Command {
292    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
293    /// all messages before the checkpoint barrier should have been committed.
294    Flush,
295
296    /// `Pause` command generates a `Pause` barrier **only if**
297    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
298    Pause,
299
300    /// `Resume` command generates a `Resume` barrier **only
301    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
302    /// will be generated.
303    Resume,
304
305    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
306    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
307    /// dropped by reference counts before.
308    ///
309    /// Barriers from the actors to be dropped will STILL be collected.
310    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
311    /// drop actors, and then delete the job fragments info from meta store.
312    DropStreamingJobs {
313        streaming_job_ids: HashSet<JobId>,
314        actors: Vec<ActorId>,
315        unregistered_state_table_ids: HashSet<TableId>,
316        unregistered_fragment_ids: HashSet<FragmentId>,
317        // target_fragment -> [sink_fragments]
318        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
319    },
320
321    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
322    ///
323    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
324    /// be collected since the barrier should be passthrough.
325    ///
326    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
327    /// it adds the job fragments info to meta store. However, the creating progress will **last
328    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
329    /// will be set to `Created`.
330    CreateStreamingJob {
331        info: CreateStreamingJobCommandInfo,
332        job_type: CreateStreamingJobType,
333        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
334    },
335
336    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
337    /// Mainly used for scaling and migration.
338    ///
339    /// Barriers from which actors should be collected, and the post behavior of this command are
340    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
341    RescheduleFragment {
342        reschedules: HashMap<FragmentId, Reschedule>,
343        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
344        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
345    },
346
347    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
348    /// essentially switching the downstream of the old job fragments to the new ones, and
349    /// dropping the old job fragments. Used for schema change.
350    ///
351    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
352    /// of the Merge executors are changed additionally.
353    ReplaceStreamJob(ReplaceStreamJobPlan),
354
355    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
356    /// changed splits.
357    SourceChangeSplit(SplitState),
358
359    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
360    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
361    Throttle {
362        jobs: HashSet<JobId>,
363        config: HashMap<FragmentId, Option<u32>>,
364    },
365
366    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
367    /// materialize executor to start storing old value for subscription.
368    CreateSubscription {
369        subscription_id: SubscriptionId,
370        upstream_mv_table_id: TableId,
371        retention_second: u64,
372    },
373
374    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
375    /// materialize executor to stop storing old value when there is no
376    /// subscription depending on it.
377    DropSubscription {
378        subscription_id: SubscriptionId,
379        upstream_mv_table_id: TableId,
380    },
381
382    ConnectorPropsChange(ConnectorPropsChange),
383
384    /// `Refresh` command generates a barrier to refresh a table by truncating state
385    /// and reloading data from source.
386    Refresh {
387        table_id: TableId,
388        associated_source_id: SourceId,
389    },
390    ListFinish {
391        table_id: TableId,
392        associated_source_id: SourceId,
393    },
394    LoadFinish {
395        table_id: TableId,
396        associated_source_id: SourceId,
397    },
398}
399
400// For debugging and observability purposes. Can add more details later if needed.
401impl std::fmt::Display for Command {
402    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
403        match self {
404            Command::Flush => write!(f, "Flush"),
405            Command::Pause => write!(f, "Pause"),
406            Command::Resume => write!(f, "Resume"),
407            Command::DropStreamingJobs {
408                streaming_job_ids, ..
409            } => {
410                write!(
411                    f,
412                    "DropStreamingJobs: {}",
413                    streaming_job_ids.iter().sorted().join(", ")
414                )
415            }
416            Command::CreateStreamingJob { info, .. } => {
417                write!(f, "CreateStreamingJob: {}", info.streaming_job)
418            }
419            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
420            Command::ReplaceStreamJob(plan) => {
421                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
422            }
423            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
424            Command::Throttle { .. } => write!(f, "Throttle"),
425            Command::CreateSubscription {
426                subscription_id, ..
427            } => write!(f, "CreateSubscription: {subscription_id}"),
428            Command::DropSubscription {
429                subscription_id, ..
430            } => write!(f, "DropSubscription: {subscription_id}"),
431            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
432            Command::Refresh {
433                table_id,
434                associated_source_id,
435            } => write!(
436                f,
437                "Refresh: {} (source: {})",
438                table_id, associated_source_id
439            ),
440            Command::ListFinish {
441                table_id,
442                associated_source_id,
443            } => write!(
444                f,
445                "ListFinish: {} (source: {})",
446                table_id, associated_source_id
447            ),
448            Command::LoadFinish {
449                table_id,
450                associated_source_id,
451            } => write!(
452                f,
453                "LoadFinish: {} (source: {})",
454                table_id, associated_source_id
455            ),
456        }
457    }
458}
459
460impl Command {
461    pub fn pause() -> Self {
462        Self::Pause
463    }
464
465    pub fn resume() -> Self {
466        Self::Resume
467    }
468
469    #[expect(clippy::type_complexity)]
470    pub(super) fn fragment_changes(
471        &self,
472    ) -> Option<(
473        Option<(JobId, Option<CdcTableBackfillTracker>)>,
474        HashMap<FragmentId, CommandFragmentChanges>,
475    )> {
476        match self {
477            Command::Flush => None,
478            Command::Pause => None,
479            Command::Resume => None,
480            Command::DropStreamingJobs {
481                unregistered_fragment_ids,
482                dropped_sink_fragment_by_targets,
483                ..
484            } => {
485                let changes = unregistered_fragment_ids
486                    .iter()
487                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
488                    .chain(dropped_sink_fragment_by_targets.iter().map(
489                        |(target_fragment, sink_fragments)| {
490                            (
491                                *target_fragment,
492                                CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
493                            )
494                        },
495                    ))
496                    .collect();
497
498                Some((None, changes))
499            }
500            Command::CreateStreamingJob { info, job_type, .. } => {
501                assert!(
502                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
503                    "should handle fragment changes separately for snapshot backfill"
504                );
505                let mut changes: HashMap<_, _> = info
506                    .stream_job_fragments
507                    .new_fragment_info(&info.init_split_assignment)
508                    .map(|(fragment_id, fragment_infos)| {
509                        (
510                            fragment_id,
511                            CommandFragmentChanges::NewFragment {
512                                job_id: info.streaming_job.id(),
513                                info: fragment_infos,
514                            },
515                        )
516                    })
517                    .collect();
518
519                if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
520                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
521                    changes.insert(
522                        downstream_fragment_id,
523                        CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
524                            upstream_fragment_id: ctx.sink_fragment_id,
525                            sink_output_schema: ctx.sink_output_fields.clone(),
526                            project_exprs: ctx.project_exprs.clone(),
527                        }),
528                    );
529                }
530
531                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
532                    let (fragment, _) =
533                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
534                            .expect("should have parallel cdc fragment");
535                    Some(CdcTableBackfillTracker::new(
536                        fragment.fragment_id,
537                        splits.clone(),
538                    ))
539                } else {
540                    None
541                };
542
543                Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
544            }
545            Command::RescheduleFragment { reschedules, .. } => Some((
546                None,
547                reschedules
548                    .iter()
549                    .map(|(fragment_id, reschedule)| {
550                        (
551                            *fragment_id,
552                            CommandFragmentChanges::Reschedule {
553                                new_actors: reschedule
554                                    .added_actors
555                                    .iter()
556                                    .flat_map(|(node_id, actors)| {
557                                        actors.iter().map(|actor_id| {
558                                            (
559                                                *actor_id,
560                                                InflightActorInfo {
561                                                    worker_id: *node_id,
562                                                    vnode_bitmap: reschedule
563                                                        .newly_created_actors
564                                                        .get(actor_id)
565                                                        .expect("should exist")
566                                                        .0
567                                                        .0
568                                                        .vnode_bitmap
569                                                        .clone(),
570                                                    splits: reschedule
571                                                        .actor_splits
572                                                        .get(actor_id)
573                                                        .cloned()
574                                                        .unwrap_or_default(),
575                                                },
576                                            )
577                                        })
578                                    })
579                                    .collect(),
580                                actor_update_vnode_bitmap: reschedule
581                                    .vnode_bitmap_updates
582                                    .iter()
583                                    .filter(|(actor_id, _)| {
584                                        // only keep the existing actors
585                                        !reschedule.newly_created_actors.contains_key(actor_id)
586                                    })
587                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
588                                    .collect(),
589                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
590                                actor_splits: reschedule.actor_splits.clone(),
591                            },
592                        )
593                    })
594                    .collect(),
595            )),
596            Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
597            Command::SourceChangeSplit(SplitState {
598                split_assignment, ..
599            }) => Some((
600                None,
601                split_assignment
602                    .iter()
603                    .map(|(&fragment_id, splits)| {
604                        (
605                            fragment_id,
606                            CommandFragmentChanges::SplitAssignment {
607                                actor_splits: splits.clone(),
608                            },
609                        )
610                    })
611                    .collect(),
612            )),
613            Command::Throttle { .. } => None,
614            Command::CreateSubscription { .. } => None,
615            Command::DropSubscription { .. } => None,
616            Command::ConnectorPropsChange(_) => None,
617            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
618            Command::ListFinish { .. } => None, // ListFinish doesn't change fragment structure
619            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
620        }
621    }
622
623    pub fn need_checkpoint(&self) -> bool {
624        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
625        !matches!(self, Command::Resume)
626    }
627}
628
629#[derive(Debug, Clone)]
630pub enum BarrierKind {
631    Initial,
632    Barrier,
633    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
634    Checkpoint(Vec<u64>),
635}
636
637impl BarrierKind {
638    pub fn to_protobuf(&self) -> PbBarrierKind {
639        match self {
640            BarrierKind::Initial => PbBarrierKind::Initial,
641            BarrierKind::Barrier => PbBarrierKind::Barrier,
642            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
643        }
644    }
645
646    pub fn is_checkpoint(&self) -> bool {
647        matches!(self, BarrierKind::Checkpoint(_))
648    }
649
650    pub fn is_initial(&self) -> bool {
651        matches!(self, BarrierKind::Initial)
652    }
653
654    pub fn as_str_name(&self) -> &'static str {
655        match self {
656            BarrierKind::Initial => "Initial",
657            BarrierKind::Barrier => "Barrier",
658            BarrierKind::Checkpoint(_) => "Checkpoint",
659        }
660    }
661}
662
663/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
664/// [`Command`].
665pub(super) struct CommandContext {
666    mv_subscription_max_retention: HashMap<TableId, u64>,
667
668    pub(super) barrier_info: BarrierInfo,
669
670    pub(super) table_ids_to_commit: HashSet<TableId>,
671
672    pub(super) command: Option<Command>,
673
674    /// The tracing span of this command.
675    ///
676    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
677    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
678    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
679    _span: tracing::Span,
680}
681
682impl std::fmt::Debug for CommandContext {
683    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
684        f.debug_struct("CommandContext")
685            .field("barrier_info", &self.barrier_info)
686            .field("command", &self.command)
687            .finish()
688    }
689}
690
691impl std::fmt::Display for CommandContext {
692    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
693        write!(
694            f,
695            "prev_epoch={}, curr_epoch={}, kind={}",
696            self.barrier_info.prev_epoch(),
697            self.barrier_info.curr_epoch(),
698            self.barrier_info.kind.as_str_name()
699        )?;
700        if let Some(command) = &self.command {
701            write!(f, ", command={}", command)?;
702        }
703        Ok(())
704    }
705}
706
707impl CommandContext {
708    pub(super) fn new(
709        barrier_info: BarrierInfo,
710        mv_subscription_max_retention: HashMap<TableId, u64>,
711        table_ids_to_commit: HashSet<TableId>,
712        command: Option<Command>,
713        span: tracing::Span,
714    ) -> Self {
715        Self {
716            mv_subscription_max_retention,
717            barrier_info,
718            table_ids_to_commit,
719            command,
720            _span: span,
721        }
722    }
723
724    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
725        let Some(truncate_timestamptz) = Timestamptz::from_secs(
726            self.barrier_info
727                .prev_epoch
728                .value()
729                .as_timestamptz()
730                .timestamp()
731                - retention_second as i64,
732        ) else {
733            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
734            return self.barrier_info.prev_epoch.value();
735        };
736        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
737    }
738
739    pub(super) fn collect_commit_epoch_info(
740        &self,
741        info: &mut CommitEpochInfo,
742        resps: Vec<BarrierCompleteResponse>,
743        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
744    ) {
745        let (
746            sst_to_context,
747            synced_ssts,
748            new_table_watermarks,
749            old_value_ssts,
750            vector_index_adds,
751            truncate_tables,
752        ) = collect_resp_info(resps);
753
754        let new_table_fragment_infos =
755            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
756                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
757            {
758                let table_fragments = &info.stream_job_fragments;
759                let mut table_ids: HashSet<_> =
760                    table_fragments.internal_table_ids().into_iter().collect();
761                if let Some(mv_table_id) = table_fragments.mv_table_id() {
762                    table_ids.insert(mv_table_id);
763                }
764
765                vec![NewTableFragmentInfo { table_ids }]
766            } else {
767                vec![]
768            };
769
770        let mut mv_log_store_truncate_epoch = HashMap::new();
771        // TODO: may collect cross db snapshot backfill
772        let mut update_truncate_epoch =
773            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
774                Entry::Occupied(mut entry) => {
775                    let prev_truncate_epoch = entry.get_mut();
776                    if truncate_epoch < *prev_truncate_epoch {
777                        *prev_truncate_epoch = truncate_epoch;
778                    }
779                }
780                Entry::Vacant(entry) => {
781                    entry.insert(truncate_epoch);
782                }
783            };
784        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
785            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
786            update_truncate_epoch(*mv_table_id, truncate_epoch);
787        }
788        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
789            for mv_table_id in upstream_mv_table_ids {
790                update_truncate_epoch(mv_table_id, backfill_epoch);
791            }
792        }
793
794        let table_new_change_log = build_table_change_log_delta(
795            old_value_ssts.into_iter(),
796            synced_ssts.iter().map(|sst| &sst.sst_info),
797            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
798            mv_log_store_truncate_epoch.into_iter(),
799        );
800
801        let epoch = self.barrier_info.prev_epoch();
802        for table_id in &self.table_ids_to_commit {
803            info.tables_to_commit
804                .try_insert(*table_id, epoch)
805                .expect("non duplicate");
806        }
807
808        info.sstables.extend(synced_ssts);
809        info.new_table_watermarks.extend(new_table_watermarks);
810        info.sst_to_context.extend(sst_to_context);
811        info.new_table_fragment_infos
812            .extend(new_table_fragment_infos);
813        info.change_log_delta.extend(table_new_change_log);
814        for (table_id, vector_index_adds) in vector_index_adds {
815            info.vector_index_delta
816                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
817                .expect("non-duplicate");
818        }
819        if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
820            for fragment in job_info.stream_job_fragments.fragments.values() {
821                visit_stream_node_cont(&fragment.nodes, |node| {
822                    match node.node_body.as_ref().unwrap() {
823                        NodeBody::VectorIndexWrite(vector_index_write) => {
824                            let index_table = vector_index_write.table.as_ref().unwrap();
825                            assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
826                            info.vector_index_delta
827                                .try_insert(
828                                    index_table.id,
829                                    VectorIndexDelta::Init(PbVectorIndexInit {
830                                        info: Some(index_table.vector_index_info.unwrap()),
831                                    }),
832                                )
833                                .expect("non-duplicate");
834                            false
835                        }
836                        _ => true,
837                    }
838                })
839            }
840        }
841        info.truncate_tables.extend(truncate_tables);
842    }
843}
844
845impl Command {
846    /// Generate a mutation for the given command.
847    ///
848    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
849    pub(super) fn to_mutation(
850        &self,
851        is_currently_paused: bool,
852        edges: &mut Option<FragmentEdgeBuildResult>,
853        control_stream_manager: &ControlStreamManager,
854        database_info: &mut InflightDatabaseInfo,
855    ) -> MetaResult<Option<Mutation>> {
856        let mutation = match self {
857            Command::Flush => None,
858
859            Command::Pause => {
860                // Only pause when the cluster is not already paused.
861                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
862                if !is_currently_paused {
863                    Some(Mutation::Pause(PauseMutation {}))
864                } else {
865                    None
866                }
867            }
868
869            Command::Resume => {
870                // Only resume when the cluster is paused with the same reason.
871                if is_currently_paused {
872                    Some(Mutation::Resume(ResumeMutation {}))
873                } else {
874                    None
875                }
876            }
877
878            Command::SourceChangeSplit(SplitState {
879                split_assignment, ..
880            }) => {
881                let mut diff = HashMap::new();
882
883                for actor_splits in split_assignment.values() {
884                    diff.extend(actor_splits.clone());
885                }
886
887                Some(Mutation::Splits(SourceChangeSplitMutation {
888                    actor_splits: build_actor_connector_splits(&diff),
889                }))
890            }
891
892            Command::Throttle { config, .. } => {
893                let mut fragment_to_apply = HashMap::new();
894                for (fragment_id, limit) in config {
895                    fragment_to_apply.insert(*fragment_id, RateLimit { rate_limit: *limit });
896                }
897
898                Some(Mutation::Throttle(ThrottleMutation {
899                    fragment_throttle: fragment_to_apply,
900                }))
901            }
902
903            Command::DropStreamingJobs {
904                actors,
905                dropped_sink_fragment_by_targets,
906                ..
907            } => Some(Mutation::Stop(StopMutation {
908                actors: actors.clone(),
909                dropped_sink_fragments: dropped_sink_fragment_by_targets
910                    .values()
911                    .flatten()
912                    .cloned()
913                    .collect(),
914            })),
915
916            Command::CreateStreamingJob {
917                info:
918                    CreateStreamingJobCommandInfo {
919                        stream_job_fragments,
920                        init_split_assignment: split_assignment,
921                        upstream_fragment_downstreams,
922                        fragment_backfill_ordering,
923                        ..
924                    },
925                job_type,
926                ..
927            } => {
928                let edges = edges.as_mut().expect("should exist");
929                let added_actors = stream_job_fragments.actor_ids().collect();
930                let actor_splits = split_assignment
931                    .values()
932                    .flat_map(build_actor_connector_splits)
933                    .collect();
934                let subscriptions_to_add =
935                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
936                        job_type
937                    {
938                        snapshot_backfill_info
939                            .upstream_mv_table_id_to_backfill_epoch
940                            .keys()
941                            .map(|table_id| SubscriptionUpstreamInfo {
942                                subscriber_id: stream_job_fragments
943                                    .stream_job_id()
944                                    .as_subscriber_id(),
945                                upstream_mv_table_id: *table_id,
946                            })
947                            .collect()
948                    } else {
949                        Default::default()
950                    };
951                let backfill_nodes_to_pause: Vec<_> =
952                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
953                        .into_iter()
954                        .collect();
955
956                let new_upstream_sinks =
957                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
958                        sink_fragment_id,
959                        sink_output_fields,
960                        project_exprs,
961                        new_sink_downstream,
962                        ..
963                    }) = job_type
964                    {
965                        let new_sink_actors = stream_job_fragments
966                            .actors_to_create()
967                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
968                            .exactly_one()
969                            .map(|(_, _, actors)| {
970                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
971                                    actor_id: actor.actor_id,
972                                    host: Some(control_stream_manager.host_addr(worker_id)),
973                                })
974                            })
975                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
976                        let new_upstream_sink = PbNewUpstreamSink {
977                            info: Some(PbUpstreamSinkInfo {
978                                upstream_fragment_id: *sink_fragment_id,
979                                sink_output_schema: sink_output_fields.clone(),
980                                project_exprs: project_exprs.clone(),
981                            }),
982                            upstream_actors: new_sink_actors.collect(),
983                        };
984                        HashMap::from([(
985                            new_sink_downstream.downstream_fragment_id,
986                            new_upstream_sink,
987                        )])
988                    } else {
989                        HashMap::new()
990                    };
991
992                let actor_cdc_table_snapshot_splits =
993                    if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
994                        database_info
995                            .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
996                            .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
997                    } else {
998                        None
999                    };
1000
1001                let add_mutation = AddMutation {
1002                    actor_dispatchers: edges
1003                        .dispatchers
1004                        .extract_if(|fragment_id, _| {
1005                            upstream_fragment_downstreams.contains_key(fragment_id)
1006                        })
1007                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1008                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1009                        .collect(),
1010                    added_actors,
1011                    actor_splits,
1012                    // If the cluster is already paused, the new actors should be paused too.
1013                    pause: is_currently_paused,
1014                    subscriptions_to_add,
1015                    backfill_nodes_to_pause,
1016                    actor_cdc_table_snapshot_splits,
1017                    new_upstream_sinks,
1018                };
1019
1020                Some(Mutation::Add(add_mutation))
1021            }
1022
1023            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1024                old_fragments,
1025                replace_upstream,
1026                upstream_fragment_downstreams,
1027                init_split_assignment,
1028                auto_refresh_schema_sinks,
1029                ..
1030            }) => {
1031                let edges = edges.as_mut().expect("should exist");
1032                let merge_updates = edges
1033                    .merge_updates
1034                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1035                    .collect();
1036                let dispatchers = edges
1037                    .dispatchers
1038                    .extract_if(|fragment_id, _| {
1039                        upstream_fragment_downstreams.contains_key(fragment_id)
1040                    })
1041                    .collect();
1042                let actor_cdc_table_snapshot_splits = database_info
1043                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1044                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1045                Self::generate_update_mutation_for_replace_table(
1046                    old_fragments.actor_ids().chain(
1047                        auto_refresh_schema_sinks
1048                            .as_ref()
1049                            .into_iter()
1050                            .flat_map(|sinks| {
1051                                sinks.iter().flat_map(|sink| {
1052                                    sink.original_fragment
1053                                        .actors
1054                                        .iter()
1055                                        .map(|actor| actor.actor_id)
1056                                })
1057                            }),
1058                    ),
1059                    merge_updates,
1060                    dispatchers,
1061                    init_split_assignment,
1062                    actor_cdc_table_snapshot_splits,
1063                    auto_refresh_schema_sinks.as_ref(),
1064                )
1065            }
1066
1067            Command::RescheduleFragment {
1068                reschedules,
1069                fragment_actors,
1070                ..
1071            } => {
1072                let mut dispatcher_update = HashMap::new();
1073                for reschedule in reschedules.values() {
1074                    for &(upstream_fragment_id, dispatcher_id) in
1075                        &reschedule.upstream_fragment_dispatcher_ids
1076                    {
1077                        // Find the actors of the upstream fragment.
1078                        let upstream_actor_ids = fragment_actors
1079                            .get(&upstream_fragment_id)
1080                            .expect("should contain");
1081
1082                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1083
1084                        // Record updates for all actors.
1085                        for &actor_id in upstream_actor_ids {
1086                            let added_downstream_actor_id = if upstream_reschedule
1087                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1088                                .unwrap_or(true)
1089                            {
1090                                reschedule
1091                                    .added_actors
1092                                    .values()
1093                                    .flatten()
1094                                    .cloned()
1095                                    .collect()
1096                            } else {
1097                                Default::default()
1098                            };
1099                            // Index with the dispatcher id to check duplicates.
1100                            dispatcher_update
1101                                .try_insert(
1102                                    (actor_id, dispatcher_id),
1103                                    DispatcherUpdate {
1104                                        actor_id,
1105                                        dispatcher_id,
1106                                        hash_mapping: reschedule
1107                                            .upstream_dispatcher_mapping
1108                                            .as_ref()
1109                                            .map(|m| m.to_protobuf()),
1110                                        added_downstream_actor_id,
1111                                        removed_downstream_actor_id: reschedule
1112                                            .removed_actors
1113                                            .iter()
1114                                            .cloned()
1115                                            .collect(),
1116                                    },
1117                                )
1118                                .unwrap();
1119                        }
1120                    }
1121                }
1122                let dispatcher_update = dispatcher_update.into_values().collect();
1123
1124                let mut merge_update = HashMap::new();
1125                for (&fragment_id, reschedule) in reschedules {
1126                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1127                        // Find the actors of the downstream fragment.
1128                        let downstream_actor_ids = fragment_actors
1129                            .get(&downstream_fragment_id)
1130                            .expect("should contain");
1131
1132                        // Downstream removed actors should be skipped
1133                        // Newly created actors of the current fragment will not dispatch Update
1134                        // barriers to them
1135                        let downstream_removed_actors: HashSet<_> = reschedules
1136                            .get(&downstream_fragment_id)
1137                            .map(|downstream_reschedule| {
1138                                downstream_reschedule
1139                                    .removed_actors
1140                                    .iter()
1141                                    .copied()
1142                                    .collect()
1143                            })
1144                            .unwrap_or_default();
1145
1146                        // Record updates for all actors.
1147                        for &actor_id in downstream_actor_ids {
1148                            if downstream_removed_actors.contains(&actor_id) {
1149                                continue;
1150                            }
1151
1152                            // Index with the fragment id to check duplicates.
1153                            merge_update
1154                                .try_insert(
1155                                    (actor_id, fragment_id),
1156                                    MergeUpdate {
1157                                        actor_id,
1158                                        upstream_fragment_id: fragment_id,
1159                                        new_upstream_fragment_id: None,
1160                                        added_upstream_actors: reschedule
1161                                            .added_actors
1162                                            .iter()
1163                                            .flat_map(|(worker_id, actors)| {
1164                                                let host =
1165                                                    control_stream_manager.host_addr(*worker_id);
1166                                                actors.iter().map(move |&actor_id| PbActorInfo {
1167                                                    actor_id,
1168                                                    host: Some(host.clone()),
1169                                                })
1170                                            })
1171                                            .collect(),
1172                                        removed_upstream_actor_id: reschedule
1173                                            .removed_actors
1174                                            .iter()
1175                                            .cloned()
1176                                            .collect(),
1177                                    },
1178                                )
1179                                .unwrap();
1180                        }
1181                    }
1182                }
1183                let merge_update = merge_update.into_values().collect();
1184
1185                let mut actor_vnode_bitmap_update = HashMap::new();
1186                for reschedule in reschedules.values() {
1187                    // Record updates for all actors in this fragment.
1188                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1189                        let bitmap = bitmap.to_protobuf();
1190                        actor_vnode_bitmap_update
1191                            .try_insert(actor_id, bitmap)
1192                            .unwrap();
1193                    }
1194                }
1195                let dropped_actors = reschedules
1196                    .values()
1197                    .flat_map(|r| r.removed_actors.iter().copied())
1198                    .collect();
1199                let mut actor_splits = HashMap::new();
1200                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1201                for (fragment_id, reschedule) in reschedules {
1202                    for (actor_id, splits) in &reschedule.actor_splits {
1203                        actor_splits.insert(
1204                            *actor_id,
1205                            ConnectorSplits {
1206                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1207                            },
1208                        );
1209                    }
1210
1211                    if let Some(assignment) =
1212                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1213                    {
1214                        actor_cdc_table_snapshot_splits.extend(assignment)
1215                    }
1216                }
1217
1218                // we don't create dispatchers in reschedule scenario
1219                let actor_new_dispatchers = HashMap::new();
1220                let mutation = Mutation::Update(UpdateMutation {
1221                    dispatcher_update,
1222                    merge_update,
1223                    actor_vnode_bitmap_update,
1224                    dropped_actors,
1225                    actor_splits,
1226                    actor_new_dispatchers,
1227                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1228                        splits: actor_cdc_table_snapshot_splits,
1229                    }),
1230                    sink_schema_change: Default::default(),
1231                    subscriptions_to_drop: vec![],
1232                });
1233                tracing::debug!("update mutation: {mutation:?}");
1234                Some(mutation)
1235            }
1236
1237            Command::CreateSubscription {
1238                upstream_mv_table_id,
1239                subscription_id,
1240                ..
1241            } => Some(Mutation::Add(AddMutation {
1242                actor_dispatchers: Default::default(),
1243                added_actors: vec![],
1244                actor_splits: Default::default(),
1245                pause: false,
1246                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1247                    upstream_mv_table_id: *upstream_mv_table_id,
1248                    subscriber_id: subscription_id.as_subscriber_id(),
1249                }],
1250                backfill_nodes_to_pause: vec![],
1251                actor_cdc_table_snapshot_splits: None,
1252                new_upstream_sinks: Default::default(),
1253            })),
1254            Command::DropSubscription {
1255                upstream_mv_table_id,
1256                subscription_id,
1257            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1258                info: vec![SubscriptionUpstreamInfo {
1259                    subscriber_id: subscription_id.as_subscriber_id(),
1260                    upstream_mv_table_id: *upstream_mv_table_id,
1261                }],
1262            })),
1263            Command::ConnectorPropsChange(config) => {
1264                let mut connector_props_infos = HashMap::default();
1265                for (k, v) in config {
1266                    connector_props_infos.insert(
1267                        k.as_raw_id(),
1268                        ConnectorPropsInfo {
1269                            connector_props_info: v.clone(),
1270                        },
1271                    );
1272                }
1273                Some(Mutation::ConnectorPropsChange(
1274                    ConnectorPropsChangeMutation {
1275                        connector_props_infos,
1276                    },
1277                ))
1278            }
1279            Command::Refresh {
1280                table_id,
1281                associated_source_id,
1282            } => Some(Mutation::RefreshStart(
1283                risingwave_pb::stream_plan::RefreshStartMutation {
1284                    table_id: *table_id,
1285                    associated_source_id: *associated_source_id,
1286                },
1287            )),
1288            Command::ListFinish {
1289                table_id: _,
1290                associated_source_id,
1291            } => Some(Mutation::ListFinish(ListFinishMutation {
1292                associated_source_id: *associated_source_id,
1293            })),
1294            Command::LoadFinish {
1295                table_id: _,
1296                associated_source_id,
1297            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1298                associated_source_id: *associated_source_id,
1299            })),
1300        };
1301        Ok(mutation)
1302    }
1303
1304    pub(super) fn actors_to_create(
1305        &self,
1306        database_info: &InflightDatabaseInfo,
1307        edges: &mut Option<FragmentEdgeBuildResult>,
1308        control_stream_manager: &ControlStreamManager,
1309    ) -> Option<StreamJobActorsToCreate> {
1310        match self {
1311            Command::CreateStreamingJob { info, job_type, .. } => {
1312                if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1313                    // for snapshot backfill, the actors to create is measured separately
1314                    return None;
1315                }
1316                let actors_to_create = info.stream_job_fragments.actors_to_create();
1317                let edges = edges.as_mut().expect("should exist");
1318                Some(edges.collect_actors_to_create(actors_to_create.map(
1319                    |(fragment_id, node, actors)| {
1320                        (
1321                            fragment_id,
1322                            node,
1323                            actors,
1324                            [], // no subscriber for new job to create
1325                        )
1326                    },
1327                )))
1328            }
1329            Command::RescheduleFragment {
1330                reschedules,
1331                fragment_actors,
1332                ..
1333            } => {
1334                let mut actor_upstreams = Self::collect_actor_upstreams(
1335                    reschedules.iter().map(|(fragment_id, reschedule)| {
1336                        (
1337                            *fragment_id,
1338                            reschedule.newly_created_actors.values().map(
1339                                |((actor, dispatchers), _)| {
1340                                    (actor.actor_id, dispatchers.as_slice())
1341                                },
1342                            ),
1343                        )
1344                    }),
1345                    Some((reschedules, fragment_actors)),
1346                    database_info,
1347                    control_stream_manager,
1348                );
1349                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1350                for (fragment_id, (actor, dispatchers), worker_id) in
1351                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1352                        reschedule
1353                            .newly_created_actors
1354                            .values()
1355                            .map(|(actors, status)| (*fragment_id, actors, status))
1356                    })
1357                {
1358                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1359                    map.entry(*worker_id)
1360                        .or_default()
1361                        .entry(fragment_id)
1362                        .or_insert_with(|| {
1363                            let node = database_info.fragment(fragment_id).nodes.clone();
1364                            let subscribers =
1365                                database_info.fragment_subscribers(fragment_id).collect();
1366                            (node, vec![], subscribers)
1367                        })
1368                        .1
1369                        .push((actor.clone(), upstreams, dispatchers.clone()));
1370                }
1371                Some(map)
1372            }
1373            Command::ReplaceStreamJob(replace_table) => {
1374                let edges = edges.as_mut().expect("should exist");
1375                let mut actors = edges.collect_actors_to_create(
1376                    replace_table.new_fragments.actors_to_create().map(
1377                        |(fragment_id, node, actors)| {
1378                            (
1379                                fragment_id,
1380                                node,
1381                                actors,
1382                                database_info
1383                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1384                            )
1385                        },
1386                    ),
1387                );
1388                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1389                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1390                        (
1391                            sink.new_fragment.fragment_id,
1392                            &sink.new_fragment.nodes,
1393                            sink.new_fragment.actors.iter().map(|actor| {
1394                                (
1395                                    actor,
1396                                    sink.actor_status[&actor.actor_id]
1397                                        .location
1398                                        .as_ref()
1399                                        .unwrap()
1400                                        .worker_node_id,
1401                                )
1402                            }),
1403                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1404                        )
1405                    }));
1406                    for (worker_id, fragment_actors) in sink_actors {
1407                        actors.entry(worker_id).or_default().extend(fragment_actors);
1408                    }
1409                }
1410                Some(actors)
1411            }
1412            _ => None,
1413        }
1414    }
1415
1416    fn generate_update_mutation_for_replace_table(
1417        dropped_actors: impl IntoIterator<Item = ActorId>,
1418        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1419        dispatchers: FragmentActorDispatchers,
1420        init_split_assignment: &SplitAssignment,
1421        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1422        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1423    ) -> Option<Mutation> {
1424        let dropped_actors = dropped_actors.into_iter().collect();
1425
1426        let actor_new_dispatchers = dispatchers
1427            .into_values()
1428            .flatten()
1429            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1430            .collect();
1431
1432        let actor_splits = init_split_assignment
1433            .values()
1434            .flat_map(build_actor_connector_splits)
1435            .collect();
1436        Some(Mutation::Update(UpdateMutation {
1437            actor_new_dispatchers,
1438            merge_update: merge_updates.into_values().flatten().collect(),
1439            dropped_actors,
1440            actor_splits,
1441            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1442            sink_schema_change: auto_refresh_schema_sinks
1443                .as_ref()
1444                .into_iter()
1445                .flat_map(|sinks| {
1446                    sinks.iter().map(|sink| {
1447                        (
1448                            sink.original_sink.id.as_raw_id(),
1449                            PbSinkSchemaChange {
1450                                original_schema: sink
1451                                    .original_sink
1452                                    .columns
1453                                    .iter()
1454                                    .map(|col| PbField {
1455                                        data_type: Some(
1456                                            col.column_desc
1457                                                .as_ref()
1458                                                .unwrap()
1459                                                .column_type
1460                                                .as_ref()
1461                                                .unwrap()
1462                                                .clone(),
1463                                        ),
1464                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1465                                    })
1466                                    .collect(),
1467                                op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1468                                    fields: sink
1469                                        .newly_add_fields
1470                                        .iter()
1471                                        .map(|field| field.to_prost())
1472                                        .collect(),
1473                                })),
1474                            },
1475                        )
1476                    })
1477                })
1478                .collect(),
1479            ..Default::default()
1480        }))
1481    }
1482
1483    /// For `CancelStreamingJob`, returns the table id of the target table.
1484    pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_ {
1485        match self {
1486            Command::DropStreamingJobs {
1487                streaming_job_ids, ..
1488            } => Some(streaming_job_ids.iter().cloned()),
1489            _ => None,
1490        }
1491        .into_iter()
1492        .flatten()
1493    }
1494}
1495
1496impl Command {
1497    #[expect(clippy::type_complexity)]
1498    pub(super) fn collect_actor_upstreams(
1499        actor_dispatchers: impl Iterator<
1500            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1501        >,
1502        reschedule_dispatcher_update: Option<(
1503            &HashMap<FragmentId, Reschedule>,
1504            &HashMap<FragmentId, HashSet<ActorId>>,
1505        )>,
1506        database_info: &InflightDatabaseInfo,
1507        control_stream_manager: &ControlStreamManager,
1508    ) -> HashMap<ActorId, ActorUpstreams> {
1509        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1510        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1511            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1512            for (upstream_actor_id, dispatchers) in upstream_actors {
1513                let upstream_actor_location =
1514                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1515                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1516                for downstream_actor_id in dispatchers
1517                    .iter()
1518                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1519                {
1520                    actor_upstreams
1521                        .entry(*downstream_actor_id)
1522                        .or_default()
1523                        .entry(upstream_fragment_id)
1524                        .or_default()
1525                        .insert(
1526                            upstream_actor_id,
1527                            PbActorInfo {
1528                                actor_id: upstream_actor_id,
1529                                host: Some(upstream_actor_host.clone()),
1530                            },
1531                        );
1532                }
1533            }
1534        }
1535        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1536            for reschedule in reschedules.values() {
1537                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1538                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1539                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1540                    for upstream_actor_id in fragment_actors
1541                        .get(upstream_fragment_id)
1542                        .expect("should exist")
1543                    {
1544                        let upstream_actor_location =
1545                            upstream_fragment.actors[upstream_actor_id].worker_id;
1546                        let upstream_actor_host =
1547                            control_stream_manager.host_addr(upstream_actor_location);
1548                        if let Some(upstream_reschedule) = upstream_reschedule
1549                            && upstream_reschedule
1550                                .removed_actors
1551                                .contains(upstream_actor_id)
1552                        {
1553                            continue;
1554                        }
1555                        for (_, downstream_actor_id) in
1556                            reschedule
1557                                .added_actors
1558                                .iter()
1559                                .flat_map(|(worker_id, actors)| {
1560                                    actors.iter().map(|actor| (*worker_id, *actor))
1561                                })
1562                        {
1563                            actor_upstreams
1564                                .entry(downstream_actor_id)
1565                                .or_default()
1566                                .entry(*upstream_fragment_id)
1567                                .or_default()
1568                                .insert(
1569                                    *upstream_actor_id,
1570                                    PbActorInfo {
1571                                        actor_id: *upstream_actor_id,
1572                                        host: Some(upstream_actor_host.clone()),
1573                                    },
1574                                );
1575                        }
1576                    }
1577                }
1578            }
1579        }
1580        actor_upstreams
1581    }
1582}