risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
29use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_meta_model::WorkerId;
32use risingwave_pb::catalog::CreateType;
33use risingwave_pb::catalog::table::PbTableType;
34use risingwave_pb::common::PbActorInfo;
35use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
36use risingwave_pb::plan_common::PbField;
37use risingwave_pb::source::{
38    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
39};
40use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
41use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
42use risingwave_pb::stream_plan::barrier_mutation::Mutation;
43use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
44use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
45use risingwave_pb::stream_plan::stream_node::NodeBody;
46use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
47use risingwave_pb::stream_plan::update_mutation::*;
48use risingwave_pb::stream_plan::{
49    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
50    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
51    PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StopMutation,
52    SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
53};
54use risingwave_pb::stream_service::BarrierCompleteResponse;
55use tracing::warn;
56
57use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
58use crate::MetaResult;
59use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
60use crate::barrier::cdc_progress::CdcTableBackfillTracker;
61use crate::barrier::edge_builder::FragmentEdgeBuildResult;
62use crate::barrier::info::BarrierInfo;
63use crate::barrier::rpc::ControlStreamManager;
64use crate::barrier::utils::collect_resp_info;
65use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
71    StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
74use crate::stream::{
75    AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
76    SplitState, UpstreamSinkInfo, build_actor_connector_splits,
77};
78
79/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
80/// in some fragment, like scaling or migrating.
81#[derive(Debug, Clone)]
82pub struct Reschedule {
83    /// Added actors in this fragment.
84    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
85
86    /// Removed actors in this fragment.
87    pub removed_actors: HashSet<ActorId>,
88
89    /// Vnode bitmap updates for some actors in this fragment.
90    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
91
92    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
93    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
94    /// New hash mapping of the upstream dispatcher to be updated.
95    ///
96    /// This field exists only when there's upstream fragment and the current fragment is
97    /// hash-sharded.
98    pub upstream_dispatcher_mapping: Option<ActorMapping>,
99
100    /// The downstream fragments of this fragment.
101    pub downstream_fragment_ids: Vec<FragmentId>,
102
103    /// Reassigned splits for source actors.
104    /// It becomes the `actor_splits` in [`UpdateMutation`].
105    /// `Source` and `SourceBackfill` are handled together here.
106    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
107
108    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
109}
110
111/// Replacing an old job with a new one. All actors in the job will be rebuilt.
112///
113/// Current use cases:
114/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
115/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
116///   will replace a table job's plan.
117#[derive(Debug, Clone)]
118pub struct ReplaceStreamJobPlan {
119    pub old_fragments: StreamJobFragments,
120    pub new_fragments: StreamJobFragmentsToCreate,
121    /// Downstream jobs of the replaced job need to update their `Merge` node to
122    /// connect to the new fragment.
123    pub replace_upstream: FragmentReplaceUpstream,
124    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
125    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
126    /// We need to reassign splits for it.
127    ///
128    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
129    /// `backfill_splits`.
130    pub init_split_assignment: SplitAssignment,
131    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
132    pub streaming_job: StreamingJob,
133    /// The temporary dummy job fragments id of new table fragment
134    pub tmp_id: JobId,
135    /// The state table ids to be dropped.
136    pub to_drop_state_table_ids: Vec<TableId>,
137    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
138}
139
140impl ReplaceStreamJobPlan {
141    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
142        let mut fragment_changes = HashMap::new();
143        for (fragment_id, new_fragment) in self
144            .new_fragments
145            .new_fragment_info(&self.init_split_assignment)
146        {
147            let fragment_change = CommandFragmentChanges::NewFragment {
148                job_id: self.streaming_job.id(),
149                info: new_fragment,
150            };
151            fragment_changes
152                .try_insert(fragment_id, fragment_change)
153                .expect("non-duplicate");
154        }
155        for fragment in self.old_fragments.fragments.values() {
156            fragment_changes
157                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
158                .expect("non-duplicate");
159        }
160        for (fragment_id, replace_map) in &self.replace_upstream {
161            fragment_changes
162                .try_insert(
163                    *fragment_id,
164                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
165                )
166                .expect("non-duplicate");
167        }
168        if let Some(sinks) = &self.auto_refresh_schema_sinks {
169            for sink in sinks {
170                let fragment_change = CommandFragmentChanges::NewFragment {
171                    job_id: sink.original_sink.id.as_job_id(),
172                    info: sink.new_fragment_info(),
173                };
174                fragment_changes
175                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
176                    .expect("non-duplicate");
177                fragment_changes
178                    .try_insert(
179                        sink.original_fragment.fragment_id,
180                        CommandFragmentChanges::RemoveFragment,
181                    )
182                    .expect("non-duplicate");
183            }
184        }
185        fragment_changes
186    }
187
188    /// `old_fragment_id` -> `new_fragment_id`
189    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
190        let mut fragment_replacements = HashMap::new();
191        for (upstream_fragment_id, new_upstream_fragment_id) in
192            self.replace_upstream.values().flatten()
193        {
194            {
195                let r =
196                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
197                if let Some(r) = r {
198                    assert_eq!(
199                        *new_upstream_fragment_id, r,
200                        "one fragment is replaced by multiple fragments"
201                    );
202                }
203            }
204        }
205        fragment_replacements
206    }
207}
208
209#[derive(educe::Educe, Clone)]
210#[educe(Debug)]
211pub struct CreateStreamingJobCommandInfo {
212    #[educe(Debug(ignore))]
213    pub stream_job_fragments: StreamJobFragmentsToCreate,
214    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
215    pub init_split_assignment: SplitAssignment,
216    pub definition: String,
217    pub job_type: StreamingJobType,
218    pub create_type: CreateType,
219    pub streaming_job: StreamingJob,
220    pub fragment_backfill_ordering: FragmentBackfillOrder,
221    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
222    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
223}
224
225impl StreamJobFragments {
226    pub(super) fn new_fragment_info<'a>(
227        &'a self,
228        assignment: &'a SplitAssignment,
229    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
230        self.fragments.values().map(|fragment| {
231            let mut fragment_splits = assignment
232                .get(&fragment.fragment_id)
233                .cloned()
234                .unwrap_or_default();
235
236            (
237                fragment.fragment_id,
238                InflightFragmentInfo {
239                    fragment_id: fragment.fragment_id,
240                    distribution_type: fragment.distribution_type.into(),
241                    fragment_type_mask: fragment.fragment_type_mask,
242                    vnode_count: fragment.vnode_count(),
243                    nodes: fragment.nodes.clone(),
244                    actors: fragment
245                        .actors
246                        .iter()
247                        .map(|actor| {
248                            (
249                                actor.actor_id,
250                                InflightActorInfo {
251                                    worker_id: self
252                                        .actor_status
253                                        .get(&actor.actor_id)
254                                        .expect("should exist")
255                                        .worker_id(),
256                                    vnode_bitmap: actor.vnode_bitmap.clone(),
257                                    splits: fragment_splits
258                                        .remove(&actor.actor_id)
259                                        .unwrap_or_default(),
260                                },
261                            )
262                        })
263                        .collect(),
264                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
265                },
266            )
267        })
268    }
269}
270
271#[derive(Debug, Clone)]
272pub struct SnapshotBackfillInfo {
273    /// `table_id` -> `Some(snapshot_backfill_epoch)`
274    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
275    /// by global barrier worker when handling the command.
276    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
277}
278
279#[derive(Debug, Clone)]
280pub enum CreateStreamingJobType {
281    Normal,
282    SinkIntoTable(UpstreamSinkInfo),
283    SnapshotBackfill(SnapshotBackfillInfo),
284}
285
286/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
287/// it will [build different barriers to send](Self::to_mutation),
288/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
289// FIXME: this enum is significantly large on stack, box it
290#[derive(Debug)]
291pub enum Command {
292    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
293    /// all messages before the checkpoint barrier should have been committed.
294    Flush,
295
296    /// `Pause` command generates a `Pause` barrier **only if**
297    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
298    Pause,
299
300    /// `Resume` command generates a `Resume` barrier **only
301    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
302    /// will be generated.
303    Resume,
304
305    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
306    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
307    /// dropped by reference counts before.
308    ///
309    /// Barriers from the actors to be dropped will STILL be collected.
310    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
311    /// drop actors, and then delete the job fragments info from meta store.
312    DropStreamingJobs {
313        streaming_job_ids: HashSet<JobId>,
314        actors: Vec<ActorId>,
315        unregistered_state_table_ids: HashSet<TableId>,
316        unregistered_fragment_ids: HashSet<FragmentId>,
317        // target_fragment -> [sink_fragments]
318        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
319    },
320
321    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
322    ///
323    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
324    /// be collected since the barrier should be passthrough.
325    ///
326    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
327    /// it adds the job fragments info to meta store. However, the creating progress will **last
328    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
329    /// will be set to `Created`.
330    CreateStreamingJob {
331        info: CreateStreamingJobCommandInfo,
332        job_type: CreateStreamingJobType,
333        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
334    },
335
336    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
337    /// Mainly used for scaling and migration.
338    ///
339    /// Barriers from which actors should be collected, and the post behavior of this command are
340    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
341    RescheduleFragment {
342        reschedules: HashMap<FragmentId, Reschedule>,
343        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
344        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
345    },
346
347    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
348    /// essentially switching the downstream of the old job fragments to the new ones, and
349    /// dropping the old job fragments. Used for schema change.
350    ///
351    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
352    /// of the Merge executors are changed additionally.
353    ReplaceStreamJob(ReplaceStreamJobPlan),
354
355    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
356    /// changed splits.
357    SourceChangeSplit(SplitState),
358
359    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
360    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
361    Throttle {
362        jobs: HashSet<JobId>,
363        config: HashMap<FragmentId, Option<u32>>,
364    },
365
366    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
367    /// materialize executor to start storing old value for subscription.
368    CreateSubscription {
369        subscription_id: SubscriptionId,
370        upstream_mv_table_id: TableId,
371        retention_second: u64,
372    },
373
374    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
375    /// materialize executor to stop storing old value when there is no
376    /// subscription depending on it.
377    DropSubscription {
378        subscription_id: SubscriptionId,
379        upstream_mv_table_id: TableId,
380    },
381
382    ConnectorPropsChange(ConnectorPropsChange),
383
384    /// `Refresh` command generates a barrier to refresh a table by truncating state
385    /// and reloading data from source.
386    Refresh {
387        table_id: TableId,
388        associated_source_id: SourceId,
389    },
390    ListFinish {
391        table_id: TableId,
392        associated_source_id: SourceId,
393    },
394    LoadFinish {
395        table_id: TableId,
396        associated_source_id: SourceId,
397    },
398
399    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
400    /// Used when upstream binlog/oplog has expired.
401    ResetSource {
402        source_id: SourceId,
403    },
404}
405
406// For debugging and observability purposes. Can add more details later if needed.
407impl std::fmt::Display for Command {
408    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
409        match self {
410            Command::Flush => write!(f, "Flush"),
411            Command::Pause => write!(f, "Pause"),
412            Command::Resume => write!(f, "Resume"),
413            Command::DropStreamingJobs {
414                streaming_job_ids, ..
415            } => {
416                write!(
417                    f,
418                    "DropStreamingJobs: {}",
419                    streaming_job_ids.iter().sorted().join(", ")
420                )
421            }
422            Command::CreateStreamingJob { info, .. } => {
423                write!(f, "CreateStreamingJob: {}", info.streaming_job)
424            }
425            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
426            Command::ReplaceStreamJob(plan) => {
427                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
428            }
429            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
430            Command::Throttle { .. } => write!(f, "Throttle"),
431            Command::CreateSubscription {
432                subscription_id, ..
433            } => write!(f, "CreateSubscription: {subscription_id}"),
434            Command::DropSubscription {
435                subscription_id, ..
436            } => write!(f, "DropSubscription: {subscription_id}"),
437            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
438            Command::Refresh {
439                table_id,
440                associated_source_id,
441            } => write!(
442                f,
443                "Refresh: {} (source: {})",
444                table_id, associated_source_id
445            ),
446            Command::ListFinish {
447                table_id,
448                associated_source_id,
449            } => write!(
450                f,
451                "ListFinish: {} (source: {})",
452                table_id, associated_source_id
453            ),
454            Command::LoadFinish {
455                table_id,
456                associated_source_id,
457            } => write!(
458                f,
459                "LoadFinish: {} (source: {})",
460                table_id, associated_source_id
461            ),
462            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
463        }
464    }
465}
466
467impl Command {
468    pub fn pause() -> Self {
469        Self::Pause
470    }
471
472    pub fn resume() -> Self {
473        Self::Resume
474    }
475
476    #[expect(clippy::type_complexity)]
477    pub(super) fn fragment_changes(
478        &self,
479    ) -> Option<(
480        Option<(JobId, Option<CdcTableBackfillTracker>)>,
481        HashMap<FragmentId, CommandFragmentChanges>,
482    )> {
483        match self {
484            Command::Flush => None,
485            Command::Pause => None,
486            Command::Resume => None,
487            Command::DropStreamingJobs {
488                unregistered_fragment_ids,
489                dropped_sink_fragment_by_targets,
490                ..
491            } => {
492                let changes = unregistered_fragment_ids
493                    .iter()
494                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
495                    .chain(dropped_sink_fragment_by_targets.iter().map(
496                        |(target_fragment, sink_fragments)| {
497                            (
498                                *target_fragment,
499                                CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
500                            )
501                        },
502                    ))
503                    .collect();
504
505                Some((None, changes))
506            }
507            Command::CreateStreamingJob { info, job_type, .. } => {
508                assert!(
509                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
510                    "should handle fragment changes separately for snapshot backfill"
511                );
512                let mut changes: HashMap<_, _> = info
513                    .stream_job_fragments
514                    .new_fragment_info(&info.init_split_assignment)
515                    .map(|(fragment_id, fragment_infos)| {
516                        (
517                            fragment_id,
518                            CommandFragmentChanges::NewFragment {
519                                job_id: info.streaming_job.id(),
520                                info: fragment_infos,
521                            },
522                        )
523                    })
524                    .collect();
525
526                if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
527                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
528                    changes.insert(
529                        downstream_fragment_id,
530                        CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
531                            upstream_fragment_id: ctx.sink_fragment_id,
532                            sink_output_schema: ctx.sink_output_fields.clone(),
533                            project_exprs: ctx.project_exprs.clone(),
534                        }),
535                    );
536                }
537
538                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
539                    let (fragment, _) =
540                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
541                            .expect("should have parallel cdc fragment");
542                    Some(CdcTableBackfillTracker::new(
543                        fragment.fragment_id,
544                        splits.clone(),
545                    ))
546                } else {
547                    None
548                };
549
550                Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
551            }
552            Command::RescheduleFragment { reschedules, .. } => Some((
553                None,
554                reschedules
555                    .iter()
556                    .map(|(fragment_id, reschedule)| {
557                        (
558                            *fragment_id,
559                            CommandFragmentChanges::Reschedule {
560                                new_actors: reschedule
561                                    .added_actors
562                                    .iter()
563                                    .flat_map(|(node_id, actors)| {
564                                        actors.iter().map(|actor_id| {
565                                            (
566                                                *actor_id,
567                                                InflightActorInfo {
568                                                    worker_id: *node_id,
569                                                    vnode_bitmap: reschedule
570                                                        .newly_created_actors
571                                                        .get(actor_id)
572                                                        .expect("should exist")
573                                                        .0
574                                                        .0
575                                                        .vnode_bitmap
576                                                        .clone(),
577                                                    splits: reschedule
578                                                        .actor_splits
579                                                        .get(actor_id)
580                                                        .cloned()
581                                                        .unwrap_or_default(),
582                                                },
583                                            )
584                                        })
585                                    })
586                                    .collect(),
587                                actor_update_vnode_bitmap: reschedule
588                                    .vnode_bitmap_updates
589                                    .iter()
590                                    .filter(|(actor_id, _)| {
591                                        // only keep the existing actors
592                                        !reschedule.newly_created_actors.contains_key(actor_id)
593                                    })
594                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
595                                    .collect(),
596                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
597                                actor_splits: reschedule.actor_splits.clone(),
598                            },
599                        )
600                    })
601                    .collect(),
602            )),
603            Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
604            Command::SourceChangeSplit(SplitState {
605                split_assignment, ..
606            }) => Some((
607                None,
608                split_assignment
609                    .iter()
610                    .map(|(&fragment_id, splits)| {
611                        (
612                            fragment_id,
613                            CommandFragmentChanges::SplitAssignment {
614                                actor_splits: splits.clone(),
615                            },
616                        )
617                    })
618                    .collect(),
619            )),
620            Command::Throttle { .. } => None,
621            Command::CreateSubscription { .. } => None,
622            Command::DropSubscription { .. } => None,
623            Command::ConnectorPropsChange(_) => None,
624            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
625            Command::ListFinish { .. } => None, // ListFinish doesn't change fragment structure
626            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
627            Command::ResetSource { .. } => None, // ResetSource doesn't change fragment structure
628        }
629    }
630
631    pub fn need_checkpoint(&self) -> bool {
632        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
633        !matches!(self, Command::Resume)
634    }
635}
636
637#[derive(Debug, Clone)]
638pub enum BarrierKind {
639    Initial,
640    Barrier,
641    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
642    Checkpoint(Vec<u64>),
643}
644
645impl BarrierKind {
646    pub fn to_protobuf(&self) -> PbBarrierKind {
647        match self {
648            BarrierKind::Initial => PbBarrierKind::Initial,
649            BarrierKind::Barrier => PbBarrierKind::Barrier,
650            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
651        }
652    }
653
654    pub fn is_checkpoint(&self) -> bool {
655        matches!(self, BarrierKind::Checkpoint(_))
656    }
657
658    pub fn is_initial(&self) -> bool {
659        matches!(self, BarrierKind::Initial)
660    }
661
662    pub fn as_str_name(&self) -> &'static str {
663        match self {
664            BarrierKind::Initial => "Initial",
665            BarrierKind::Barrier => "Barrier",
666            BarrierKind::Checkpoint(_) => "Checkpoint",
667        }
668    }
669}
670
671/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
672/// [`Command`].
673pub(super) struct CommandContext {
674    mv_subscription_max_retention: HashMap<TableId, u64>,
675
676    pub(super) barrier_info: BarrierInfo,
677
678    pub(super) table_ids_to_commit: HashSet<TableId>,
679
680    pub(super) command: Option<Command>,
681
682    /// The tracing span of this command.
683    ///
684    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
685    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
686    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
687    _span: tracing::Span,
688}
689
690impl std::fmt::Debug for CommandContext {
691    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
692        f.debug_struct("CommandContext")
693            .field("barrier_info", &self.barrier_info)
694            .field("command", &self.command)
695            .finish()
696    }
697}
698
699impl std::fmt::Display for CommandContext {
700    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
701        write!(
702            f,
703            "prev_epoch={}, curr_epoch={}, kind={}",
704            self.barrier_info.prev_epoch(),
705            self.barrier_info.curr_epoch(),
706            self.barrier_info.kind.as_str_name()
707        )?;
708        if let Some(command) = &self.command {
709            write!(f, ", command={}", command)?;
710        }
711        Ok(())
712    }
713}
714
715impl CommandContext {
716    pub(super) fn new(
717        barrier_info: BarrierInfo,
718        mv_subscription_max_retention: HashMap<TableId, u64>,
719        table_ids_to_commit: HashSet<TableId>,
720        command: Option<Command>,
721        span: tracing::Span,
722    ) -> Self {
723        Self {
724            mv_subscription_max_retention,
725            barrier_info,
726            table_ids_to_commit,
727            command,
728            _span: span,
729        }
730    }
731
732    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
733        let Some(truncate_timestamptz) = Timestamptz::from_secs(
734            self.barrier_info
735                .prev_epoch
736                .value()
737                .as_timestamptz()
738                .timestamp()
739                - retention_second as i64,
740        ) else {
741            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
742            return self.barrier_info.prev_epoch.value();
743        };
744        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
745    }
746
747    pub(super) fn collect_commit_epoch_info(
748        &self,
749        info: &mut CommitEpochInfo,
750        resps: Vec<BarrierCompleteResponse>,
751        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
752    ) {
753        let (
754            sst_to_context,
755            synced_ssts,
756            new_table_watermarks,
757            old_value_ssts,
758            vector_index_adds,
759            truncate_tables,
760        ) = collect_resp_info(resps);
761
762        let new_table_fragment_infos =
763            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
764                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
765            {
766                let table_fragments = &info.stream_job_fragments;
767                let mut table_ids: HashSet<_> =
768                    table_fragments.internal_table_ids().into_iter().collect();
769                if let Some(mv_table_id) = table_fragments.mv_table_id() {
770                    table_ids.insert(mv_table_id);
771                }
772
773                vec![NewTableFragmentInfo { table_ids }]
774            } else {
775                vec![]
776            };
777
778        let mut mv_log_store_truncate_epoch = HashMap::new();
779        // TODO: may collect cross db snapshot backfill
780        let mut update_truncate_epoch =
781            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
782                Entry::Occupied(mut entry) => {
783                    let prev_truncate_epoch = entry.get_mut();
784                    if truncate_epoch < *prev_truncate_epoch {
785                        *prev_truncate_epoch = truncate_epoch;
786                    }
787                }
788                Entry::Vacant(entry) => {
789                    entry.insert(truncate_epoch);
790                }
791            };
792        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
793            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
794            update_truncate_epoch(*mv_table_id, truncate_epoch);
795        }
796        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
797            for mv_table_id in upstream_mv_table_ids {
798                update_truncate_epoch(mv_table_id, backfill_epoch);
799            }
800        }
801
802        let table_new_change_log = build_table_change_log_delta(
803            old_value_ssts.into_iter(),
804            synced_ssts.iter().map(|sst| &sst.sst_info),
805            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
806            mv_log_store_truncate_epoch.into_iter(),
807        );
808
809        let epoch = self.barrier_info.prev_epoch();
810        for table_id in &self.table_ids_to_commit {
811            info.tables_to_commit
812                .try_insert(*table_id, epoch)
813                .expect("non duplicate");
814        }
815
816        info.sstables.extend(synced_ssts);
817        info.new_table_watermarks.extend(new_table_watermarks);
818        info.sst_to_context.extend(sst_to_context);
819        info.new_table_fragment_infos
820            .extend(new_table_fragment_infos);
821        info.change_log_delta.extend(table_new_change_log);
822        for (table_id, vector_index_adds) in vector_index_adds {
823            info.vector_index_delta
824                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
825                .expect("non-duplicate");
826        }
827        if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
828            for fragment in job_info.stream_job_fragments.fragments.values() {
829                visit_stream_node_cont(&fragment.nodes, |node| {
830                    match node.node_body.as_ref().unwrap() {
831                        NodeBody::VectorIndexWrite(vector_index_write) => {
832                            let index_table = vector_index_write.table.as_ref().unwrap();
833                            assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
834                            info.vector_index_delta
835                                .try_insert(
836                                    index_table.id,
837                                    VectorIndexDelta::Init(PbVectorIndexInit {
838                                        info: Some(index_table.vector_index_info.unwrap()),
839                                    }),
840                                )
841                                .expect("non-duplicate");
842                            false
843                        }
844                        _ => true,
845                    }
846                })
847            }
848        }
849        info.truncate_tables.extend(truncate_tables);
850    }
851}
852
853impl Command {
854    /// Generate a mutation for the given command.
855    ///
856    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
857    pub(super) fn to_mutation(
858        &self,
859        is_currently_paused: bool,
860        edges: &mut Option<FragmentEdgeBuildResult>,
861        control_stream_manager: &ControlStreamManager,
862        database_info: &mut InflightDatabaseInfo,
863    ) -> MetaResult<Option<Mutation>> {
864        let mutation = match self {
865            Command::Flush => None,
866
867            Command::Pause => {
868                // Only pause when the cluster is not already paused.
869                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
870                if !is_currently_paused {
871                    Some(Mutation::Pause(PauseMutation {}))
872                } else {
873                    None
874                }
875            }
876
877            Command::Resume => {
878                // Only resume when the cluster is paused with the same reason.
879                if is_currently_paused {
880                    Some(Mutation::Resume(ResumeMutation {}))
881                } else {
882                    None
883                }
884            }
885
886            Command::SourceChangeSplit(SplitState {
887                split_assignment, ..
888            }) => {
889                let mut diff = HashMap::new();
890
891                for actor_splits in split_assignment.values() {
892                    diff.extend(actor_splits.clone());
893                }
894
895                Some(Mutation::Splits(SourceChangeSplitMutation {
896                    actor_splits: build_actor_connector_splits(&diff),
897                }))
898            }
899
900            Command::Throttle { config, .. } => {
901                let mut fragment_to_apply = HashMap::new();
902                for (fragment_id, limit) in config {
903                    fragment_to_apply.insert(*fragment_id, RateLimit { rate_limit: *limit });
904                }
905
906                Some(Mutation::Throttle(ThrottleMutation {
907                    fragment_throttle: fragment_to_apply,
908                }))
909            }
910
911            Command::DropStreamingJobs {
912                actors,
913                dropped_sink_fragment_by_targets,
914                ..
915            } => Some(Mutation::Stop(StopMutation {
916                actors: actors.clone(),
917                dropped_sink_fragments: dropped_sink_fragment_by_targets
918                    .values()
919                    .flatten()
920                    .cloned()
921                    .collect(),
922            })),
923
924            Command::CreateStreamingJob {
925                info:
926                    CreateStreamingJobCommandInfo {
927                        stream_job_fragments,
928                        init_split_assignment: split_assignment,
929                        upstream_fragment_downstreams,
930                        fragment_backfill_ordering,
931                        ..
932                    },
933                job_type,
934                ..
935            } => {
936                let edges = edges.as_mut().expect("should exist");
937                let added_actors = stream_job_fragments.actor_ids().collect();
938                let actor_splits = split_assignment
939                    .values()
940                    .flat_map(build_actor_connector_splits)
941                    .collect();
942                let subscriptions_to_add =
943                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
944                        job_type
945                    {
946                        snapshot_backfill_info
947                            .upstream_mv_table_id_to_backfill_epoch
948                            .keys()
949                            .map(|table_id| SubscriptionUpstreamInfo {
950                                subscriber_id: stream_job_fragments
951                                    .stream_job_id()
952                                    .as_subscriber_id(),
953                                upstream_mv_table_id: *table_id,
954                            })
955                            .collect()
956                    } else {
957                        Default::default()
958                    };
959                let backfill_nodes_to_pause: Vec<_> =
960                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
961                        .into_iter()
962                        .collect();
963
964                let new_upstream_sinks =
965                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
966                        sink_fragment_id,
967                        sink_output_fields,
968                        project_exprs,
969                        new_sink_downstream,
970                        ..
971                    }) = job_type
972                    {
973                        let new_sink_actors = stream_job_fragments
974                            .actors_to_create()
975                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
976                            .exactly_one()
977                            .map(|(_, _, actors)| {
978                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
979                                    actor_id: actor.actor_id,
980                                    host: Some(control_stream_manager.host_addr(worker_id)),
981                                })
982                            })
983                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
984                        let new_upstream_sink = PbNewUpstreamSink {
985                            info: Some(PbUpstreamSinkInfo {
986                                upstream_fragment_id: *sink_fragment_id,
987                                sink_output_schema: sink_output_fields.clone(),
988                                project_exprs: project_exprs.clone(),
989                            }),
990                            upstream_actors: new_sink_actors.collect(),
991                        };
992                        HashMap::from([(
993                            new_sink_downstream.downstream_fragment_id,
994                            new_upstream_sink,
995                        )])
996                    } else {
997                        HashMap::new()
998                    };
999
1000                let actor_cdc_table_snapshot_splits =
1001                    if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
1002                        database_info
1003                            .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
1004                            .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
1005                    } else {
1006                        None
1007                    };
1008
1009                let add_mutation = AddMutation {
1010                    actor_dispatchers: edges
1011                        .dispatchers
1012                        .extract_if(|fragment_id, _| {
1013                            upstream_fragment_downstreams.contains_key(fragment_id)
1014                        })
1015                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1016                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1017                        .collect(),
1018                    added_actors,
1019                    actor_splits,
1020                    // If the cluster is already paused, the new actors should be paused too.
1021                    pause: is_currently_paused,
1022                    subscriptions_to_add,
1023                    backfill_nodes_to_pause,
1024                    actor_cdc_table_snapshot_splits,
1025                    new_upstream_sinks,
1026                };
1027
1028                Some(Mutation::Add(add_mutation))
1029            }
1030
1031            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1032                old_fragments,
1033                replace_upstream,
1034                upstream_fragment_downstreams,
1035                init_split_assignment,
1036                auto_refresh_schema_sinks,
1037                ..
1038            }) => {
1039                let edges = edges.as_mut().expect("should exist");
1040                let merge_updates = edges
1041                    .merge_updates
1042                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1043                    .collect();
1044                let dispatchers = edges
1045                    .dispatchers
1046                    .extract_if(|fragment_id, _| {
1047                        upstream_fragment_downstreams.contains_key(fragment_id)
1048                    })
1049                    .collect();
1050                let actor_cdc_table_snapshot_splits = database_info
1051                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1052                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1053                Self::generate_update_mutation_for_replace_table(
1054                    old_fragments.actor_ids().chain(
1055                        auto_refresh_schema_sinks
1056                            .as_ref()
1057                            .into_iter()
1058                            .flat_map(|sinks| {
1059                                sinks.iter().flat_map(|sink| {
1060                                    sink.original_fragment
1061                                        .actors
1062                                        .iter()
1063                                        .map(|actor| actor.actor_id)
1064                                })
1065                            }),
1066                    ),
1067                    merge_updates,
1068                    dispatchers,
1069                    init_split_assignment,
1070                    actor_cdc_table_snapshot_splits,
1071                    auto_refresh_schema_sinks.as_ref(),
1072                )
1073            }
1074
1075            Command::RescheduleFragment {
1076                reschedules,
1077                fragment_actors,
1078                ..
1079            } => {
1080                let mut dispatcher_update = HashMap::new();
1081                for reschedule in reschedules.values() {
1082                    for &(upstream_fragment_id, dispatcher_id) in
1083                        &reschedule.upstream_fragment_dispatcher_ids
1084                    {
1085                        // Find the actors of the upstream fragment.
1086                        let upstream_actor_ids = fragment_actors
1087                            .get(&upstream_fragment_id)
1088                            .expect("should contain");
1089
1090                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1091
1092                        // Record updates for all actors.
1093                        for &actor_id in upstream_actor_ids {
1094                            let added_downstream_actor_id = if upstream_reschedule
1095                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1096                                .unwrap_or(true)
1097                            {
1098                                reschedule
1099                                    .added_actors
1100                                    .values()
1101                                    .flatten()
1102                                    .cloned()
1103                                    .collect()
1104                            } else {
1105                                Default::default()
1106                            };
1107                            // Index with the dispatcher id to check duplicates.
1108                            dispatcher_update
1109                                .try_insert(
1110                                    (actor_id, dispatcher_id),
1111                                    DispatcherUpdate {
1112                                        actor_id,
1113                                        dispatcher_id,
1114                                        hash_mapping: reschedule
1115                                            .upstream_dispatcher_mapping
1116                                            .as_ref()
1117                                            .map(|m| m.to_protobuf()),
1118                                        added_downstream_actor_id,
1119                                        removed_downstream_actor_id: reschedule
1120                                            .removed_actors
1121                                            .iter()
1122                                            .cloned()
1123                                            .collect(),
1124                                    },
1125                                )
1126                                .unwrap();
1127                        }
1128                    }
1129                }
1130                let dispatcher_update = dispatcher_update.into_values().collect();
1131
1132                let mut merge_update = HashMap::new();
1133                for (&fragment_id, reschedule) in reschedules {
1134                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1135                        // Find the actors of the downstream fragment.
1136                        let downstream_actor_ids = fragment_actors
1137                            .get(&downstream_fragment_id)
1138                            .expect("should contain");
1139
1140                        // Downstream removed actors should be skipped
1141                        // Newly created actors of the current fragment will not dispatch Update
1142                        // barriers to them
1143                        let downstream_removed_actors: HashSet<_> = reschedules
1144                            .get(&downstream_fragment_id)
1145                            .map(|downstream_reschedule| {
1146                                downstream_reschedule
1147                                    .removed_actors
1148                                    .iter()
1149                                    .copied()
1150                                    .collect()
1151                            })
1152                            .unwrap_or_default();
1153
1154                        // Record updates for all actors.
1155                        for &actor_id in downstream_actor_ids {
1156                            if downstream_removed_actors.contains(&actor_id) {
1157                                continue;
1158                            }
1159
1160                            // Index with the fragment id to check duplicates.
1161                            merge_update
1162                                .try_insert(
1163                                    (actor_id, fragment_id),
1164                                    MergeUpdate {
1165                                        actor_id,
1166                                        upstream_fragment_id: fragment_id,
1167                                        new_upstream_fragment_id: None,
1168                                        added_upstream_actors: reschedule
1169                                            .added_actors
1170                                            .iter()
1171                                            .flat_map(|(worker_id, actors)| {
1172                                                let host =
1173                                                    control_stream_manager.host_addr(*worker_id);
1174                                                actors.iter().map(move |&actor_id| PbActorInfo {
1175                                                    actor_id,
1176                                                    host: Some(host.clone()),
1177                                                })
1178                                            })
1179                                            .collect(),
1180                                        removed_upstream_actor_id: reschedule
1181                                            .removed_actors
1182                                            .iter()
1183                                            .cloned()
1184                                            .collect(),
1185                                    },
1186                                )
1187                                .unwrap();
1188                        }
1189                    }
1190                }
1191                let merge_update = merge_update.into_values().collect();
1192
1193                let mut actor_vnode_bitmap_update = HashMap::new();
1194                for reschedule in reschedules.values() {
1195                    // Record updates for all actors in this fragment.
1196                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1197                        let bitmap = bitmap.to_protobuf();
1198                        actor_vnode_bitmap_update
1199                            .try_insert(actor_id, bitmap)
1200                            .unwrap();
1201                    }
1202                }
1203                let dropped_actors = reschedules
1204                    .values()
1205                    .flat_map(|r| r.removed_actors.iter().copied())
1206                    .collect();
1207                let mut actor_splits = HashMap::new();
1208                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1209                for (fragment_id, reschedule) in reschedules {
1210                    for (actor_id, splits) in &reschedule.actor_splits {
1211                        actor_splits.insert(
1212                            *actor_id,
1213                            ConnectorSplits {
1214                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1215                            },
1216                        );
1217                    }
1218
1219                    if let Some(assignment) =
1220                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1221                    {
1222                        actor_cdc_table_snapshot_splits.extend(assignment)
1223                    }
1224                }
1225
1226                // we don't create dispatchers in reschedule scenario
1227                let actor_new_dispatchers = HashMap::new();
1228                let mutation = Mutation::Update(UpdateMutation {
1229                    dispatcher_update,
1230                    merge_update,
1231                    actor_vnode_bitmap_update,
1232                    dropped_actors,
1233                    actor_splits,
1234                    actor_new_dispatchers,
1235                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1236                        splits: actor_cdc_table_snapshot_splits,
1237                    }),
1238                    sink_schema_change: Default::default(),
1239                    subscriptions_to_drop: vec![],
1240                });
1241                tracing::debug!("update mutation: {mutation:?}");
1242                Some(mutation)
1243            }
1244
1245            Command::CreateSubscription {
1246                upstream_mv_table_id,
1247                subscription_id,
1248                ..
1249            } => Some(Mutation::Add(AddMutation {
1250                actor_dispatchers: Default::default(),
1251                added_actors: vec![],
1252                actor_splits: Default::default(),
1253                pause: false,
1254                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1255                    upstream_mv_table_id: *upstream_mv_table_id,
1256                    subscriber_id: subscription_id.as_subscriber_id(),
1257                }],
1258                backfill_nodes_to_pause: vec![],
1259                actor_cdc_table_snapshot_splits: None,
1260                new_upstream_sinks: Default::default(),
1261            })),
1262            Command::DropSubscription {
1263                upstream_mv_table_id,
1264                subscription_id,
1265            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1266                info: vec![SubscriptionUpstreamInfo {
1267                    subscriber_id: subscription_id.as_subscriber_id(),
1268                    upstream_mv_table_id: *upstream_mv_table_id,
1269                }],
1270            })),
1271            Command::ConnectorPropsChange(config) => {
1272                let mut connector_props_infos = HashMap::default();
1273                for (k, v) in config {
1274                    connector_props_infos.insert(
1275                        k.as_raw_id(),
1276                        ConnectorPropsInfo {
1277                            connector_props_info: v.clone(),
1278                        },
1279                    );
1280                }
1281                Some(Mutation::ConnectorPropsChange(
1282                    ConnectorPropsChangeMutation {
1283                        connector_props_infos,
1284                    },
1285                ))
1286            }
1287            Command::Refresh {
1288                table_id,
1289                associated_source_id,
1290            } => Some(Mutation::RefreshStart(
1291                risingwave_pb::stream_plan::RefreshStartMutation {
1292                    table_id: *table_id,
1293                    associated_source_id: *associated_source_id,
1294                },
1295            )),
1296            Command::ListFinish {
1297                table_id: _,
1298                associated_source_id,
1299            } => Some(Mutation::ListFinish(ListFinishMutation {
1300                associated_source_id: *associated_source_id,
1301            })),
1302            Command::LoadFinish {
1303                table_id: _,
1304                associated_source_id,
1305            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1306                associated_source_id: *associated_source_id,
1307            })),
1308            Command::ResetSource { source_id } => Some(Mutation::ResetSource(
1309                risingwave_pb::stream_plan::ResetSourceMutation {
1310                    source_id: source_id.as_raw_id(),
1311                },
1312            )),
1313        };
1314        Ok(mutation)
1315    }
1316
1317    pub(super) fn actors_to_create(
1318        &self,
1319        database_info: &InflightDatabaseInfo,
1320        edges: &mut Option<FragmentEdgeBuildResult>,
1321        control_stream_manager: &ControlStreamManager,
1322    ) -> Option<StreamJobActorsToCreate> {
1323        match self {
1324            Command::CreateStreamingJob { info, job_type, .. } => {
1325                if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1326                    // for snapshot backfill, the actors to create is measured separately
1327                    return None;
1328                }
1329                let actors_to_create = info.stream_job_fragments.actors_to_create();
1330                let edges = edges.as_mut().expect("should exist");
1331                Some(edges.collect_actors_to_create(actors_to_create.map(
1332                    |(fragment_id, node, actors)| {
1333                        (
1334                            fragment_id,
1335                            node,
1336                            actors,
1337                            [], // no subscriber for new job to create
1338                        )
1339                    },
1340                )))
1341            }
1342            Command::RescheduleFragment {
1343                reschedules,
1344                fragment_actors,
1345                ..
1346            } => {
1347                let mut actor_upstreams = Self::collect_actor_upstreams(
1348                    reschedules.iter().map(|(fragment_id, reschedule)| {
1349                        (
1350                            *fragment_id,
1351                            reschedule.newly_created_actors.values().map(
1352                                |((actor, dispatchers), _)| {
1353                                    (actor.actor_id, dispatchers.as_slice())
1354                                },
1355                            ),
1356                        )
1357                    }),
1358                    Some((reschedules, fragment_actors)),
1359                    database_info,
1360                    control_stream_manager,
1361                );
1362                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1363                for (fragment_id, (actor, dispatchers), worker_id) in
1364                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1365                        reschedule
1366                            .newly_created_actors
1367                            .values()
1368                            .map(|(actors, status)| (*fragment_id, actors, status))
1369                    })
1370                {
1371                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1372                    map.entry(*worker_id)
1373                        .or_default()
1374                        .entry(fragment_id)
1375                        .or_insert_with(|| {
1376                            let node = database_info.fragment(fragment_id).nodes.clone();
1377                            let subscribers =
1378                                database_info.fragment_subscribers(fragment_id).collect();
1379                            (node, vec![], subscribers)
1380                        })
1381                        .1
1382                        .push((actor.clone(), upstreams, dispatchers.clone()));
1383                }
1384                Some(map)
1385            }
1386            Command::ReplaceStreamJob(replace_table) => {
1387                let edges = edges.as_mut().expect("should exist");
1388                let mut actors = edges.collect_actors_to_create(
1389                    replace_table.new_fragments.actors_to_create().map(
1390                        |(fragment_id, node, actors)| {
1391                            (
1392                                fragment_id,
1393                                node,
1394                                actors,
1395                                database_info
1396                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1397                            )
1398                        },
1399                    ),
1400                );
1401                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1402                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1403                        (
1404                            sink.new_fragment.fragment_id,
1405                            &sink.new_fragment.nodes,
1406                            sink.new_fragment.actors.iter().map(|actor| {
1407                                (
1408                                    actor,
1409                                    sink.actor_status[&actor.actor_id]
1410                                        .location
1411                                        .as_ref()
1412                                        .unwrap()
1413                                        .worker_node_id,
1414                                )
1415                            }),
1416                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1417                        )
1418                    }));
1419                    for (worker_id, fragment_actors) in sink_actors {
1420                        actors.entry(worker_id).or_default().extend(fragment_actors);
1421                    }
1422                }
1423                Some(actors)
1424            }
1425            _ => None,
1426        }
1427    }
1428
1429    fn generate_update_mutation_for_replace_table(
1430        dropped_actors: impl IntoIterator<Item = ActorId>,
1431        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1432        dispatchers: FragmentActorDispatchers,
1433        init_split_assignment: &SplitAssignment,
1434        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1435        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1436    ) -> Option<Mutation> {
1437        let dropped_actors = dropped_actors.into_iter().collect();
1438
1439        let actor_new_dispatchers = dispatchers
1440            .into_values()
1441            .flatten()
1442            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1443            .collect();
1444
1445        let actor_splits = init_split_assignment
1446            .values()
1447            .flat_map(build_actor_connector_splits)
1448            .collect();
1449        Some(Mutation::Update(UpdateMutation {
1450            actor_new_dispatchers,
1451            merge_update: merge_updates.into_values().flatten().collect(),
1452            dropped_actors,
1453            actor_splits,
1454            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1455            sink_schema_change: auto_refresh_schema_sinks
1456                .as_ref()
1457                .into_iter()
1458                .flat_map(|sinks| {
1459                    sinks.iter().map(|sink| {
1460                        (
1461                            sink.original_sink.id.as_raw_id(),
1462                            PbSinkSchemaChange {
1463                                original_schema: sink
1464                                    .original_sink
1465                                    .columns
1466                                    .iter()
1467                                    .map(|col| PbField {
1468                                        data_type: Some(
1469                                            col.column_desc
1470                                                .as_ref()
1471                                                .unwrap()
1472                                                .column_type
1473                                                .as_ref()
1474                                                .unwrap()
1475                                                .clone(),
1476                                        ),
1477                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1478                                    })
1479                                    .collect(),
1480                                op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1481                                    fields: sink
1482                                        .newly_add_fields
1483                                        .iter()
1484                                        .map(|field| field.to_prost())
1485                                        .collect(),
1486                                })),
1487                            },
1488                        )
1489                    })
1490                })
1491                .collect(),
1492            ..Default::default()
1493        }))
1494    }
1495
1496    /// For `CancelStreamingJob`, returns the table id of the target table.
1497    pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_ {
1498        match self {
1499            Command::DropStreamingJobs {
1500                streaming_job_ids, ..
1501            } => Some(streaming_job_ids.iter().cloned()),
1502            _ => None,
1503        }
1504        .into_iter()
1505        .flatten()
1506    }
1507}
1508
1509impl Command {
1510    #[expect(clippy::type_complexity)]
1511    pub(super) fn collect_actor_upstreams(
1512        actor_dispatchers: impl Iterator<
1513            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1514        >,
1515        reschedule_dispatcher_update: Option<(
1516            &HashMap<FragmentId, Reschedule>,
1517            &HashMap<FragmentId, HashSet<ActorId>>,
1518        )>,
1519        database_info: &InflightDatabaseInfo,
1520        control_stream_manager: &ControlStreamManager,
1521    ) -> HashMap<ActorId, ActorUpstreams> {
1522        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1523        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1524            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1525            for (upstream_actor_id, dispatchers) in upstream_actors {
1526                let upstream_actor_location =
1527                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1528                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1529                for downstream_actor_id in dispatchers
1530                    .iter()
1531                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1532                {
1533                    actor_upstreams
1534                        .entry(*downstream_actor_id)
1535                        .or_default()
1536                        .entry(upstream_fragment_id)
1537                        .or_default()
1538                        .insert(
1539                            upstream_actor_id,
1540                            PbActorInfo {
1541                                actor_id: upstream_actor_id,
1542                                host: Some(upstream_actor_host.clone()),
1543                            },
1544                        );
1545                }
1546            }
1547        }
1548        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1549            for reschedule in reschedules.values() {
1550                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1551                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1552                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1553                    for upstream_actor_id in fragment_actors
1554                        .get(upstream_fragment_id)
1555                        .expect("should exist")
1556                    {
1557                        let upstream_actor_location =
1558                            upstream_fragment.actors[upstream_actor_id].worker_id;
1559                        let upstream_actor_host =
1560                            control_stream_manager.host_addr(upstream_actor_location);
1561                        if let Some(upstream_reschedule) = upstream_reschedule
1562                            && upstream_reschedule
1563                                .removed_actors
1564                                .contains(upstream_actor_id)
1565                        {
1566                            continue;
1567                        }
1568                        for (_, downstream_actor_id) in
1569                            reschedule
1570                                .added_actors
1571                                .iter()
1572                                .flat_map(|(worker_id, actors)| {
1573                                    actors.iter().map(|actor| (*worker_id, *actor))
1574                                })
1575                        {
1576                            actor_upstreams
1577                                .entry(downstream_actor_id)
1578                                .or_default()
1579                                .entry(*upstream_fragment_id)
1580                                .or_default()
1581                                .insert(
1582                                    *upstream_actor_id,
1583                                    PbActorInfo {
1584                                        actor_id: *upstream_actor_id,
1585                                        host: Some(upstream_actor_host.clone()),
1586                                    },
1587                                );
1588                        }
1589                    }
1590                }
1591            }
1592        }
1593        actor_upstreams
1594    }
1595}