risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::WorkerId;
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
37};
38use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
39use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
40use risingwave_pb::stream_plan::barrier_mutation::Mutation;
41use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
42use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
43use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
44use risingwave_pb::stream_plan::update_mutation::*;
45use risingwave_pb::stream_plan::{
46    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
47    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
48    PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
49    StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
50};
51use risingwave_pb::stream_service::BarrierCompleteResponse;
52use tracing::warn;
53
54use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
55use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
56use crate::barrier::cdc_progress::CdcTableBackfillTracker;
57use crate::barrier::edge_builder::FragmentEdgeBuildResult;
58use crate::barrier::info::BarrierInfo;
59use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
60use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
61use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
62use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
63use crate::manager::{StreamingJob, StreamingJobType};
64use crate::model::{
65    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
66    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
67    StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
68};
69use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
70use crate::stream::{
71    AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
72    SplitAssignment, SplitState, UpstreamSinkInfo, build_actor_connector_splits,
73};
74use crate::{MetaError, MetaResult};
75
76/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
77/// in some fragment, like scaling or migrating.
78#[derive(Debug, Clone)]
79pub struct Reschedule {
80    /// Added actors in this fragment.
81    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
82
83    /// Removed actors in this fragment.
84    pub removed_actors: HashSet<ActorId>,
85
86    /// Vnode bitmap updates for some actors in this fragment.
87    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
88
89    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
90    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
91    /// New hash mapping of the upstream dispatcher to be updated.
92    ///
93    /// This field exists only when there's upstream fragment and the current fragment is
94    /// hash-sharded.
95    pub upstream_dispatcher_mapping: Option<ActorMapping>,
96
97    /// The downstream fragments of this fragment.
98    pub downstream_fragment_ids: Vec<FragmentId>,
99
100    /// Reassigned splits for source actors.
101    /// It becomes the `actor_splits` in [`UpdateMutation`].
102    /// `Source` and `SourceBackfill` are handled together here.
103    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
104
105    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
106}
107
108/// Replacing an old job with a new one. All actors in the job will be rebuilt.
109///
110/// Current use cases:
111/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
112/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
113///   will replace a table job's plan.
114#[derive(Debug, Clone)]
115pub struct ReplaceStreamJobPlan {
116    pub old_fragments: StreamJobFragments,
117    pub new_fragments: StreamJobFragmentsToCreate,
118    /// Downstream jobs of the replaced job need to update their `Merge` node to
119    /// connect to the new fragment.
120    pub replace_upstream: FragmentReplaceUpstream,
121    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
122    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
123    /// We need to reassign splits for it.
124    ///
125    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
126    /// `backfill_splits`.
127    pub init_split_assignment: SplitAssignment,
128    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
129    pub streaming_job: StreamingJob,
130    /// The temporary dummy job fragments id of new table fragment
131    pub tmp_id: JobId,
132    /// The state table ids to be dropped.
133    pub to_drop_state_table_ids: Vec<TableId>,
134    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
135}
136
137impl ReplaceStreamJobPlan {
138    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
139        let mut fragment_changes = HashMap::new();
140        for (fragment_id, new_fragment) in self
141            .new_fragments
142            .new_fragment_info(&self.init_split_assignment)
143        {
144            let fragment_change = CommandFragmentChanges::NewFragment {
145                job_id: self.streaming_job.id(),
146                info: new_fragment,
147            };
148            fragment_changes
149                .try_insert(fragment_id, fragment_change)
150                .expect("non-duplicate");
151        }
152        for fragment in self.old_fragments.fragments.values() {
153            fragment_changes
154                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
155                .expect("non-duplicate");
156        }
157        for (fragment_id, replace_map) in &self.replace_upstream {
158            fragment_changes
159                .try_insert(
160                    *fragment_id,
161                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
162                )
163                .expect("non-duplicate");
164        }
165        if let Some(sinks) = &self.auto_refresh_schema_sinks {
166            for sink in sinks {
167                let fragment_change = CommandFragmentChanges::NewFragment {
168                    job_id: sink.original_sink.id.as_job_id(),
169                    info: sink.new_fragment_info(),
170                };
171                fragment_changes
172                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
173                    .expect("non-duplicate");
174                fragment_changes
175                    .try_insert(
176                        sink.original_fragment.fragment_id,
177                        CommandFragmentChanges::RemoveFragment,
178                    )
179                    .expect("non-duplicate");
180            }
181        }
182        fragment_changes
183    }
184
185    /// `old_fragment_id` -> `new_fragment_id`
186    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
187        let mut fragment_replacements = HashMap::new();
188        for (upstream_fragment_id, new_upstream_fragment_id) in
189            self.replace_upstream.values().flatten()
190        {
191            {
192                let r =
193                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
194                if let Some(r) = r {
195                    assert_eq!(
196                        *new_upstream_fragment_id, r,
197                        "one fragment is replaced by multiple fragments"
198                    );
199                }
200            }
201        }
202        fragment_replacements
203    }
204}
205
206#[derive(educe::Educe, Clone)]
207#[educe(Debug)]
208pub struct CreateStreamingJobCommandInfo {
209    #[educe(Debug(ignore))]
210    pub stream_job_fragments: StreamJobFragmentsToCreate,
211    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
212    pub init_split_assignment: SplitAssignment,
213    pub definition: String,
214    pub job_type: StreamingJobType,
215    pub create_type: CreateType,
216    pub streaming_job: StreamingJob,
217    pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
218    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
219    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
220    pub is_serverless: bool,
221}
222
223impl StreamJobFragments {
224    pub(super) fn new_fragment_info<'a>(
225        &'a self,
226        assignment: &'a SplitAssignment,
227    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
228        self.fragments.values().map(|fragment| {
229            let mut fragment_splits = assignment
230                .get(&fragment.fragment_id)
231                .cloned()
232                .unwrap_or_default();
233
234            (
235                fragment.fragment_id,
236                InflightFragmentInfo {
237                    fragment_id: fragment.fragment_id,
238                    distribution_type: fragment.distribution_type.into(),
239                    fragment_type_mask: fragment.fragment_type_mask,
240                    vnode_count: fragment.vnode_count(),
241                    nodes: fragment.nodes.clone(),
242                    actors: fragment
243                        .actors
244                        .iter()
245                        .map(|actor| {
246                            (
247                                actor.actor_id,
248                                InflightActorInfo {
249                                    worker_id: self
250                                        .actor_status
251                                        .get(&actor.actor_id)
252                                        .expect("should exist")
253                                        .worker_id(),
254                                    vnode_bitmap: actor.vnode_bitmap.clone(),
255                                    splits: fragment_splits
256                                        .remove(&actor.actor_id)
257                                        .unwrap_or_default(),
258                                },
259                            )
260                        })
261                        .collect(),
262                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
263                },
264            )
265        })
266    }
267}
268
269#[derive(Debug, Clone)]
270pub struct SnapshotBackfillInfo {
271    /// `table_id` -> `Some(snapshot_backfill_epoch)`
272    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
273    /// by global barrier worker when handling the command.
274    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
275}
276
277#[derive(Debug, Clone)]
278pub enum CreateStreamingJobType {
279    Normal,
280    SinkIntoTable(UpstreamSinkInfo),
281    SnapshotBackfill(SnapshotBackfillInfo),
282}
283
284/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
285/// it will [build different barriers to send](Self::to_mutation),
286/// and may [do different stuffs after the barrier is collected](PostCollectCommand::post_collect).
287// FIXME: this enum is significantly large on stack, box it
288#[derive(Debug)]
289pub enum Command {
290    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
291    /// all messages before the checkpoint barrier should have been committed.
292    Flush,
293
294    /// `Pause` command generates a `Pause` barrier **only if**
295    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
296    Pause,
297
298    /// `Resume` command generates a `Resume` barrier **only
299    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
300    /// will be generated.
301    Resume,
302
303    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
304    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
305    /// dropped by reference counts before.
306    ///
307    /// Barriers from the actors to be dropped will STILL be collected.
308    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
309    /// drop actors, and then delete the job fragments info from meta store.
310    DropStreamingJobs {
311        streaming_job_ids: HashSet<JobId>,
312        actors: Vec<ActorId>,
313        unregistered_state_table_ids: HashSet<TableId>,
314        unregistered_fragment_ids: HashSet<FragmentId>,
315        // target_fragment -> [sink_fragments]
316        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
317    },
318
319    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
320    ///
321    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
322    /// be collected since the barrier should be passthrough.
323    ///
324    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
325    /// it adds the job fragments info to meta store. However, the creating progress will **last
326    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
327    /// will be set to `Created`.
328    CreateStreamingJob {
329        info: CreateStreamingJobCommandInfo,
330        job_type: CreateStreamingJobType,
331        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
332    },
333
334    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
335    /// Mainly used for scaling and migration.
336    ///
337    /// Barriers from which actors should be collected, and the post behavior of this command are
338    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
339    RescheduleFragment {
340        reschedules: HashMap<FragmentId, Reschedule>,
341        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
342        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
343    },
344
345    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
346    /// essentially switching the downstream of the old job fragments to the new ones, and
347    /// dropping the old job fragments. Used for schema change.
348    ///
349    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
350    /// of the Merge executors are changed additionally.
351    ReplaceStreamJob(ReplaceStreamJobPlan),
352
353    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
354    /// changed splits.
355    SourceChangeSplit(SplitState),
356
357    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
358    /// the `rate_limit` of executors. `throttle_type` specifies which executor kinds should apply it.
359    Throttle {
360        jobs: HashSet<JobId>,
361        config: HashMap<FragmentId, ThrottleConfig>,
362    },
363
364    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
365    /// materialize executor to start storing old value for subscription.
366    CreateSubscription {
367        subscription_id: SubscriptionId,
368        upstream_mv_table_id: TableId,
369        retention_second: u64,
370    },
371
372    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
373    /// materialize executor to stop storing old value when there is no
374    /// subscription depending on it.
375    DropSubscription {
376        subscription_id: SubscriptionId,
377        upstream_mv_table_id: TableId,
378    },
379
380    ConnectorPropsChange(ConnectorPropsChange),
381
382    /// `Refresh` command generates a barrier to refresh a table by truncating state
383    /// and reloading data from source.
384    Refresh {
385        table_id: TableId,
386        associated_source_id: SourceId,
387    },
388    ListFinish {
389        table_id: TableId,
390        associated_source_id: SourceId,
391    },
392    LoadFinish {
393        table_id: TableId,
394        associated_source_id: SourceId,
395    },
396
397    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
398    /// Used when upstream binlog/oplog has expired.
399    ResetSource {
400        source_id: SourceId,
401    },
402
403    /// `ResumeBackfill` command generates a `StartFragmentBackfill` barrier to force backfill
404    /// to resume for troubleshooting.
405    ResumeBackfill {
406        target: ResumeBackfillTarget,
407    },
408}
409
410#[derive(Debug, Clone, Copy)]
411pub enum ResumeBackfillTarget {
412    Job(JobId),
413    Fragment(FragmentId),
414}
415
416// For debugging and observability purposes. Can add more details later if needed.
417impl std::fmt::Display for Command {
418    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
419        match self {
420            Command::Flush => write!(f, "Flush"),
421            Command::Pause => write!(f, "Pause"),
422            Command::Resume => write!(f, "Resume"),
423            Command::DropStreamingJobs {
424                streaming_job_ids, ..
425            } => {
426                write!(
427                    f,
428                    "DropStreamingJobs: {}",
429                    streaming_job_ids.iter().sorted().join(", ")
430                )
431            }
432            Command::CreateStreamingJob { info, .. } => {
433                write!(f, "CreateStreamingJob: {}", info.streaming_job)
434            }
435            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
436            Command::ReplaceStreamJob(plan) => {
437                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
438            }
439            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
440            Command::Throttle { .. } => write!(f, "Throttle"),
441            Command::CreateSubscription {
442                subscription_id, ..
443            } => write!(f, "CreateSubscription: {subscription_id}"),
444            Command::DropSubscription {
445                subscription_id, ..
446            } => write!(f, "DropSubscription: {subscription_id}"),
447            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
448            Command::Refresh {
449                table_id,
450                associated_source_id,
451            } => write!(
452                f,
453                "Refresh: {} (source: {})",
454                table_id, associated_source_id
455            ),
456            Command::ListFinish {
457                table_id,
458                associated_source_id,
459            } => write!(
460                f,
461                "ListFinish: {} (source: {})",
462                table_id, associated_source_id
463            ),
464            Command::LoadFinish {
465                table_id,
466                associated_source_id,
467            } => write!(
468                f,
469                "LoadFinish: {} (source: {})",
470                table_id, associated_source_id
471            ),
472            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
473            Command::ResumeBackfill { target } => match target {
474                ResumeBackfillTarget::Job(job_id) => {
475                    write!(f, "ResumeBackfill: job={job_id}")
476                }
477                ResumeBackfillTarget::Fragment(fragment_id) => {
478                    write!(f, "ResumeBackfill: fragment={fragment_id}")
479                }
480            },
481        }
482    }
483}
484
485impl Command {
486    pub fn pause() -> Self {
487        Self::Pause
488    }
489
490    pub fn resume() -> Self {
491        Self::Resume
492    }
493
494    #[expect(clippy::type_complexity)]
495    pub(super) fn fragment_changes(
496        &self,
497    ) -> Option<(
498        Option<(JobId, Option<CdcTableBackfillTracker>)>,
499        HashMap<FragmentId, CommandFragmentChanges>,
500    )> {
501        match self {
502            Command::Flush => None,
503            Command::Pause => None,
504            Command::Resume => None,
505            Command::DropStreamingJobs {
506                unregistered_fragment_ids,
507                dropped_sink_fragment_by_targets,
508                ..
509            } => {
510                let changes = unregistered_fragment_ids
511                    .iter()
512                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
513                    .chain(dropped_sink_fragment_by_targets.iter().map(
514                        |(target_fragment, sink_fragments)| {
515                            (
516                                *target_fragment,
517                                CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
518                            )
519                        },
520                    ))
521                    .collect();
522
523                Some((None, changes))
524            }
525            Command::CreateStreamingJob { info, job_type, .. } => {
526                assert!(
527                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
528                    "should handle fragment changes separately for snapshot backfill"
529                );
530                let mut changes: HashMap<_, _> = info
531                    .stream_job_fragments
532                    .new_fragment_info(&info.init_split_assignment)
533                    .map(|(fragment_id, fragment_infos)| {
534                        (
535                            fragment_id,
536                            CommandFragmentChanges::NewFragment {
537                                job_id: info.streaming_job.id(),
538                                info: fragment_infos,
539                            },
540                        )
541                    })
542                    .collect();
543
544                if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
545                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
546                    changes.insert(
547                        downstream_fragment_id,
548                        CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
549                            upstream_fragment_id: ctx.sink_fragment_id,
550                            sink_output_schema: ctx.sink_output_fields.clone(),
551                            project_exprs: ctx.project_exprs.clone(),
552                        }),
553                    );
554                }
555
556                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
557                    let (fragment, _) =
558                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
559                            .expect("should have parallel cdc fragment");
560                    Some(CdcTableBackfillTracker::new(
561                        fragment.fragment_id,
562                        splits.clone(),
563                    ))
564                } else {
565                    None
566                };
567
568                Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
569            }
570            Command::RescheduleFragment { reschedules, .. } => Some((
571                None,
572                reschedules
573                    .iter()
574                    .map(|(fragment_id, reschedule)| {
575                        (
576                            *fragment_id,
577                            CommandFragmentChanges::Reschedule {
578                                new_actors: reschedule
579                                    .added_actors
580                                    .iter()
581                                    .flat_map(|(node_id, actors)| {
582                                        actors.iter().map(|actor_id| {
583                                            (
584                                                *actor_id,
585                                                InflightActorInfo {
586                                                    worker_id: *node_id,
587                                                    vnode_bitmap: reschedule
588                                                        .newly_created_actors
589                                                        .get(actor_id)
590                                                        .expect("should exist")
591                                                        .0
592                                                        .0
593                                                        .vnode_bitmap
594                                                        .clone(),
595                                                    splits: reschedule
596                                                        .actor_splits
597                                                        .get(actor_id)
598                                                        .cloned()
599                                                        .unwrap_or_default(),
600                                                },
601                                            )
602                                        })
603                                    })
604                                    .collect(),
605                                actor_update_vnode_bitmap: reschedule
606                                    .vnode_bitmap_updates
607                                    .iter()
608                                    .filter(|(actor_id, _)| {
609                                        // only keep the existing actors
610                                        !reschedule.newly_created_actors.contains_key(actor_id)
611                                    })
612                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
613                                    .collect(),
614                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
615                                actor_splits: reschedule.actor_splits.clone(),
616                            },
617                        )
618                    })
619                    .collect(),
620            )),
621            Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
622            Command::SourceChangeSplit(SplitState {
623                split_assignment, ..
624            }) => Some((
625                None,
626                split_assignment
627                    .iter()
628                    .map(|(&fragment_id, splits)| {
629                        (
630                            fragment_id,
631                            CommandFragmentChanges::SplitAssignment {
632                                actor_splits: splits.clone(),
633                            },
634                        )
635                    })
636                    .collect(),
637            )),
638            Command::Throttle { .. } => None,
639            Command::CreateSubscription { .. } => None,
640            Command::DropSubscription { .. } => None,
641            Command::ConnectorPropsChange(_) => None,
642            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
643            Command::ListFinish { .. } => None, // ListFinish doesn't change fragment structure
644            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
645            Command::ResetSource { .. } => None, // ResetSource doesn't change fragment structure
646            Command::ResumeBackfill { .. } => None, /* ResumeBackfill doesn't change fragment structure */
647        }
648    }
649
650    pub fn need_checkpoint(&self) -> bool {
651        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
652        !matches!(self, Command::Resume)
653    }
654}
655
656#[derive(Debug)]
657pub enum PostCollectCommand {
658    Command(String),
659    DropStreamingJobs {
660        streaming_job_ids: HashSet<JobId>,
661        unregistered_state_table_ids: HashSet<TableId>,
662    },
663    CreateStreamingJob {
664        info: CreateStreamingJobCommandInfo,
665        job_type: CreateStreamingJobType,
666        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
667    },
668    RescheduleFragment {
669        reschedules: HashMap<FragmentId, Reschedule>,
670    },
671    ReplaceStreamJob(ReplaceStreamJobPlan),
672    SourceChangeSplit {
673        split_assignment: SplitAssignment,
674    },
675    CreateSubscription {
676        subscription_id: SubscriptionId,
677    },
678    ConnectorPropsChange(ConnectorPropsChange),
679    ResumeBackfill {
680        target: ResumeBackfillTarget,
681    },
682}
683
684impl PostCollectCommand {
685    pub fn barrier() -> Self {
686        PostCollectCommand::Command("barrier".to_owned())
687    }
688
689    pub fn command_name(&self) -> &str {
690        match self {
691            PostCollectCommand::Command(name) => name.as_str(),
692            PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
693            PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
694            PostCollectCommand::RescheduleFragment { .. } => "RescheduleFragment",
695            PostCollectCommand::ReplaceStreamJob(_) => "ReplaceStreamJob",
696            PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
697            PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
698            PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
699            PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
700        }
701    }
702}
703
704impl Display for PostCollectCommand {
705    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
706        f.write_str(self.command_name())
707    }
708}
709
710impl Command {
711    pub(super) fn into_post_collect(self) -> PostCollectCommand {
712        match self {
713            Command::DropStreamingJobs {
714                streaming_job_ids,
715                unregistered_state_table_ids,
716                ..
717            } => PostCollectCommand::DropStreamingJobs {
718                streaming_job_ids,
719                unregistered_state_table_ids,
720            },
721            Command::CreateStreamingJob {
722                info,
723                job_type,
724                cross_db_snapshot_backfill_info,
725            } => match job_type {
726                CreateStreamingJobType::SnapshotBackfill(_) => PostCollectCommand::barrier(),
727                job_type => PostCollectCommand::CreateStreamingJob {
728                    info,
729                    job_type,
730                    cross_db_snapshot_backfill_info,
731                },
732            },
733            Command::RescheduleFragment { reschedules, .. } => {
734                PostCollectCommand::RescheduleFragment { reschedules }
735            }
736            Command::ReplaceStreamJob(plan) => PostCollectCommand::ReplaceStreamJob(plan),
737            Command::SourceChangeSplit(SplitState { split_assignment }) => {
738                PostCollectCommand::SourceChangeSplit { split_assignment }
739            }
740            Command::CreateSubscription {
741                subscription_id, ..
742            } => PostCollectCommand::CreateSubscription { subscription_id },
743            Command::ConnectorPropsChange(connector_props_change) => {
744                PostCollectCommand::ConnectorPropsChange(connector_props_change)
745            }
746            Command::Flush => PostCollectCommand::Command("Flush".to_owned()),
747            Command::Pause => PostCollectCommand::Command("Pause".to_owned()),
748            Command::Resume => PostCollectCommand::Command("Resume".to_owned()),
749            Command::Throttle { .. } => PostCollectCommand::Command("Throttle".to_owned()),
750            Command::DropSubscription { .. } => {
751                PostCollectCommand::Command("DropSubscription".to_owned())
752            }
753            Command::Refresh { .. } => PostCollectCommand::Command("Refresh".to_owned()),
754            Command::ListFinish { .. } => PostCollectCommand::Command("ListFinish".to_owned()),
755            Command::LoadFinish { .. } => PostCollectCommand::Command("LoadFinish".to_owned()),
756            Command::ResetSource { .. } => PostCollectCommand::Command("ResetSource".to_owned()),
757            Command::ResumeBackfill { target } => PostCollectCommand::ResumeBackfill { target },
758        }
759    }
760}
761
762#[derive(Debug, Clone)]
763pub enum BarrierKind {
764    Initial,
765    Barrier,
766    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
767    Checkpoint(Vec<u64>),
768}
769
770impl BarrierKind {
771    pub fn to_protobuf(&self) -> PbBarrierKind {
772        match self {
773            BarrierKind::Initial => PbBarrierKind::Initial,
774            BarrierKind::Barrier => PbBarrierKind::Barrier,
775            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
776        }
777    }
778
779    pub fn is_checkpoint(&self) -> bool {
780        matches!(self, BarrierKind::Checkpoint(_))
781    }
782
783    pub fn is_initial(&self) -> bool {
784        matches!(self, BarrierKind::Initial)
785    }
786
787    pub fn as_str_name(&self) -> &'static str {
788        match self {
789            BarrierKind::Initial => "Initial",
790            BarrierKind::Barrier => "Barrier",
791            BarrierKind::Checkpoint(_) => "Checkpoint",
792        }
793    }
794}
795
796/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
797/// [`Command`].
798pub(super) struct CommandContext {
799    mv_subscription_max_retention: HashMap<TableId, u64>,
800
801    pub(super) barrier_info: BarrierInfo,
802
803    pub(super) table_ids_to_commit: HashSet<TableId>,
804
805    pub(super) command: PostCollectCommand,
806
807    /// The tracing span of this command.
808    ///
809    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
810    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
811    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
812    _span: tracing::Span,
813}
814
815impl std::fmt::Debug for CommandContext {
816    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
817        f.debug_struct("CommandContext")
818            .field("barrier_info", &self.barrier_info)
819            .field("command", &self.command.command_name())
820            .finish()
821    }
822}
823
824impl CommandContext {
825    pub(super) fn new(
826        barrier_info: BarrierInfo,
827        mv_subscription_max_retention: HashMap<TableId, u64>,
828        table_ids_to_commit: HashSet<TableId>,
829        command: PostCollectCommand,
830        span: tracing::Span,
831    ) -> Self {
832        Self {
833            mv_subscription_max_retention,
834            barrier_info,
835            table_ids_to_commit,
836            command,
837            _span: span,
838        }
839    }
840
841    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
842        let Some(truncate_timestamptz) = Timestamptz::from_secs(
843            self.barrier_info
844                .prev_epoch
845                .value()
846                .as_timestamptz()
847                .timestamp()
848                - retention_second as i64,
849        ) else {
850            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
851            return self.barrier_info.prev_epoch.value();
852        };
853        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
854    }
855
856    pub(super) fn collect_commit_epoch_info(
857        &self,
858        info: &mut CommitEpochInfo,
859        resps: Vec<BarrierCompleteResponse>,
860        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
861    ) {
862        let (
863            sst_to_context,
864            synced_ssts,
865            new_table_watermarks,
866            old_value_ssts,
867            vector_index_adds,
868            truncate_tables,
869        ) = collect_resp_info(resps);
870
871        let new_table_fragment_infos =
872            if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
873                assert!(!matches!(
874                    job_type,
875                    CreateStreamingJobType::SnapshotBackfill(_)
876                ));
877                let table_fragments = &info.stream_job_fragments;
878                let mut table_ids: HashSet<_> =
879                    table_fragments.internal_table_ids().into_iter().collect();
880                if let Some(mv_table_id) = table_fragments.mv_table_id() {
881                    table_ids.insert(mv_table_id);
882                }
883
884                vec![NewTableFragmentInfo { table_ids }]
885            } else {
886                vec![]
887            };
888
889        let mut mv_log_store_truncate_epoch = HashMap::new();
890        // TODO: may collect cross db snapshot backfill
891        let mut update_truncate_epoch =
892            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
893                Entry::Occupied(mut entry) => {
894                    let prev_truncate_epoch = entry.get_mut();
895                    if truncate_epoch < *prev_truncate_epoch {
896                        *prev_truncate_epoch = truncate_epoch;
897                    }
898                }
899                Entry::Vacant(entry) => {
900                    entry.insert(truncate_epoch);
901                }
902            };
903        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
904            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
905            update_truncate_epoch(*mv_table_id, truncate_epoch);
906        }
907        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
908            for mv_table_id in upstream_mv_table_ids {
909                update_truncate_epoch(mv_table_id, backfill_epoch);
910            }
911        }
912
913        let table_new_change_log = build_table_change_log_delta(
914            old_value_ssts.into_iter(),
915            synced_ssts.iter().map(|sst| &sst.sst_info),
916            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
917            mv_log_store_truncate_epoch.into_iter(),
918        );
919
920        let epoch = self.barrier_info.prev_epoch();
921        for table_id in &self.table_ids_to_commit {
922            info.tables_to_commit
923                .try_insert(*table_id, epoch)
924                .expect("non duplicate");
925        }
926
927        info.sstables.extend(synced_ssts);
928        info.new_table_watermarks.extend(new_table_watermarks);
929        info.sst_to_context.extend(sst_to_context);
930        info.new_table_fragment_infos
931            .extend(new_table_fragment_infos);
932        info.change_log_delta.extend(table_new_change_log);
933        for (table_id, vector_index_adds) in vector_index_adds {
934            info.vector_index_delta
935                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
936                .expect("non-duplicate");
937        }
938        if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
939            && let Some(index_table) = collect_new_vector_index_info(job_info)
940        {
941            info.vector_index_delta
942                .try_insert(
943                    index_table.id,
944                    VectorIndexDelta::Init(PbVectorIndexInit {
945                        info: Some(index_table.vector_index_info.unwrap()),
946                    }),
947                )
948                .expect("non-duplicate");
949        }
950        info.truncate_tables.extend(truncate_tables);
951    }
952}
953
954impl Command {
955    /// Generate a mutation for the given command.
956    ///
957    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
958    pub(super) fn to_mutation(
959        &self,
960        is_currently_paused: bool,
961        edges: &mut Option<FragmentEdgeBuildResult>,
962        control_stream_manager: &ControlStreamManager,
963        database_info: &mut InflightDatabaseInfo,
964    ) -> MetaResult<Option<Mutation>> {
965        let database_id = database_info.database_id;
966        let mutation = match self {
967            Command::Flush => None,
968
969            Command::Pause => {
970                // Only pause when the cluster is not already paused.
971                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
972                if !is_currently_paused {
973                    Some(Mutation::Pause(PauseMutation {}))
974                } else {
975                    None
976                }
977            }
978
979            Command::Resume => {
980                // Only resume when the cluster is paused with the same reason.
981                if is_currently_paused {
982                    Some(Mutation::Resume(ResumeMutation {}))
983                } else {
984                    None
985                }
986            }
987
988            Command::SourceChangeSplit(SplitState {
989                split_assignment, ..
990            }) => {
991                let mut diff = HashMap::new();
992
993                for actor_splits in split_assignment.values() {
994                    diff.extend(actor_splits.clone());
995                }
996
997                Some(Mutation::Splits(SourceChangeSplitMutation {
998                    actor_splits: build_actor_connector_splits(&diff),
999                }))
1000            }
1001
1002            Command::Throttle { config, .. } => {
1003                let config = config.clone();
1004                Some(Mutation::Throttle(ThrottleMutation {
1005                    fragment_throttle: config,
1006                }))
1007            }
1008
1009            Command::DropStreamingJobs {
1010                actors,
1011                dropped_sink_fragment_by_targets,
1012                ..
1013            } => Some(Mutation::Stop(StopMutation {
1014                actors: actors.clone(),
1015                dropped_sink_fragments: dropped_sink_fragment_by_targets
1016                    .values()
1017                    .flatten()
1018                    .cloned()
1019                    .collect(),
1020            })),
1021
1022            Command::CreateStreamingJob {
1023                info:
1024                    CreateStreamingJobCommandInfo {
1025                        stream_job_fragments,
1026                        init_split_assignment: split_assignment,
1027                        upstream_fragment_downstreams,
1028                        fragment_backfill_ordering,
1029                        ..
1030                    },
1031                job_type,
1032                ..
1033            } => {
1034                let edges = edges.as_mut().expect("should exist");
1035                let added_actors = stream_job_fragments.actor_ids().collect();
1036                let actor_splits = split_assignment
1037                    .values()
1038                    .flat_map(build_actor_connector_splits)
1039                    .collect();
1040                let subscriptions_to_add =
1041                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1042                        job_type
1043                    {
1044                        snapshot_backfill_info
1045                            .upstream_mv_table_id_to_backfill_epoch
1046                            .keys()
1047                            .map(|table_id| SubscriptionUpstreamInfo {
1048                                subscriber_id: stream_job_fragments
1049                                    .stream_job_id()
1050                                    .as_subscriber_id(),
1051                                upstream_mv_table_id: *table_id,
1052                            })
1053                            .collect()
1054                    } else {
1055                        Default::default()
1056                    };
1057                let backfill_nodes_to_pause: Vec<_> =
1058                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1059                        .into_iter()
1060                        .collect();
1061
1062                let new_upstream_sinks =
1063                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1064                        sink_fragment_id,
1065                        sink_output_fields,
1066                        project_exprs,
1067                        new_sink_downstream,
1068                        ..
1069                    }) = job_type
1070                    {
1071                        let new_sink_actors = stream_job_fragments
1072                            .actors_to_create()
1073                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
1074                            .exactly_one()
1075                            .map(|(_, _, actors)| {
1076                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
1077                                    actor_id: actor.actor_id,
1078                                    host: Some(control_stream_manager.host_addr(worker_id)),
1079                                    partial_graph_id: to_partial_graph_id(database_id, None),
1080                                })
1081                            })
1082                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
1083                        let new_upstream_sink = PbNewUpstreamSink {
1084                            info: Some(PbUpstreamSinkInfo {
1085                                upstream_fragment_id: *sink_fragment_id,
1086                                sink_output_schema: sink_output_fields.clone(),
1087                                project_exprs: project_exprs.clone(),
1088                            }),
1089                            upstream_actors: new_sink_actors.collect(),
1090                        };
1091                        HashMap::from([(
1092                            new_sink_downstream.downstream_fragment_id,
1093                            new_upstream_sink,
1094                        )])
1095                    } else {
1096                        HashMap::new()
1097                    };
1098
1099                let actor_cdc_table_snapshot_splits =
1100                    if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
1101                        database_info
1102                            .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
1103                            .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
1104                    } else {
1105                        None
1106                    };
1107
1108                let add_mutation = AddMutation {
1109                    actor_dispatchers: edges
1110                        .dispatchers
1111                        .extract_if(|fragment_id, _| {
1112                            upstream_fragment_downstreams.contains_key(fragment_id)
1113                        })
1114                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1115                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1116                        .collect(),
1117                    added_actors,
1118                    actor_splits,
1119                    // If the cluster is already paused, the new actors should be paused too.
1120                    pause: is_currently_paused,
1121                    subscriptions_to_add,
1122                    backfill_nodes_to_pause,
1123                    actor_cdc_table_snapshot_splits,
1124                    new_upstream_sinks,
1125                };
1126
1127                Some(Mutation::Add(add_mutation))
1128            }
1129
1130            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1131                old_fragments,
1132                replace_upstream,
1133                upstream_fragment_downstreams,
1134                init_split_assignment,
1135                auto_refresh_schema_sinks,
1136                ..
1137            }) => {
1138                let edges = edges.as_mut().expect("should exist");
1139                let merge_updates = edges
1140                    .merge_updates
1141                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1142                    .collect();
1143                let dispatchers = edges
1144                    .dispatchers
1145                    .extract_if(|fragment_id, _| {
1146                        upstream_fragment_downstreams.contains_key(fragment_id)
1147                    })
1148                    .collect();
1149                let actor_cdc_table_snapshot_splits = database_info
1150                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1151                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1152                Self::generate_update_mutation_for_replace_table(
1153                    old_fragments.actor_ids().chain(
1154                        auto_refresh_schema_sinks
1155                            .as_ref()
1156                            .into_iter()
1157                            .flat_map(|sinks| {
1158                                sinks.iter().flat_map(|sink| {
1159                                    sink.original_fragment
1160                                        .actors
1161                                        .iter()
1162                                        .map(|actor| actor.actor_id)
1163                                })
1164                            }),
1165                    ),
1166                    merge_updates,
1167                    dispatchers,
1168                    init_split_assignment,
1169                    actor_cdc_table_snapshot_splits,
1170                    auto_refresh_schema_sinks.as_ref(),
1171                )
1172            }
1173
1174            Command::RescheduleFragment {
1175                reschedules,
1176                fragment_actors,
1177                ..
1178            } => {
1179                let mut dispatcher_update = HashMap::new();
1180                for reschedule in reschedules.values() {
1181                    for &(upstream_fragment_id, dispatcher_id) in
1182                        &reschedule.upstream_fragment_dispatcher_ids
1183                    {
1184                        // Find the actors of the upstream fragment.
1185                        let upstream_actor_ids = fragment_actors
1186                            .get(&upstream_fragment_id)
1187                            .expect("should contain");
1188
1189                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1190
1191                        // Record updates for all actors.
1192                        for &actor_id in upstream_actor_ids {
1193                            let added_downstream_actor_id = if upstream_reschedule
1194                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1195                                .unwrap_or(true)
1196                            {
1197                                reschedule
1198                                    .added_actors
1199                                    .values()
1200                                    .flatten()
1201                                    .cloned()
1202                                    .collect()
1203                            } else {
1204                                Default::default()
1205                            };
1206                            // Index with the dispatcher id to check duplicates.
1207                            dispatcher_update
1208                                .try_insert(
1209                                    (actor_id, dispatcher_id),
1210                                    DispatcherUpdate {
1211                                        actor_id,
1212                                        dispatcher_id,
1213                                        hash_mapping: reschedule
1214                                            .upstream_dispatcher_mapping
1215                                            .as_ref()
1216                                            .map(|m| m.to_protobuf()),
1217                                        added_downstream_actor_id,
1218                                        removed_downstream_actor_id: reschedule
1219                                            .removed_actors
1220                                            .iter()
1221                                            .cloned()
1222                                            .collect(),
1223                                    },
1224                                )
1225                                .unwrap();
1226                        }
1227                    }
1228                }
1229                let dispatcher_update = dispatcher_update.into_values().collect();
1230
1231                let mut merge_update = HashMap::new();
1232                for (&fragment_id, reschedule) in reschedules {
1233                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1234                        // Find the actors of the downstream fragment.
1235                        let downstream_actor_ids = fragment_actors
1236                            .get(&downstream_fragment_id)
1237                            .expect("should contain");
1238
1239                        // Downstream removed actors should be skipped
1240                        // Newly created actors of the current fragment will not dispatch Update
1241                        // barriers to them
1242                        let downstream_removed_actors: HashSet<_> = reschedules
1243                            .get(&downstream_fragment_id)
1244                            .map(|downstream_reschedule| {
1245                                downstream_reschedule
1246                                    .removed_actors
1247                                    .iter()
1248                                    .copied()
1249                                    .collect()
1250                            })
1251                            .unwrap_or_default();
1252
1253                        // Record updates for all actors.
1254                        for &actor_id in downstream_actor_ids {
1255                            if downstream_removed_actors.contains(&actor_id) {
1256                                continue;
1257                            }
1258
1259                            // Index with the fragment id to check duplicates.
1260                            merge_update
1261                                .try_insert(
1262                                    (actor_id, fragment_id),
1263                                    MergeUpdate {
1264                                        actor_id,
1265                                        upstream_fragment_id: fragment_id,
1266                                        new_upstream_fragment_id: None,
1267                                        added_upstream_actors: reschedule
1268                                            .added_actors
1269                                            .iter()
1270                                            .flat_map(|(worker_id, actors)| {
1271                                                let host =
1272                                                    control_stream_manager.host_addr(*worker_id);
1273                                                actors.iter().map(move |&actor_id| PbActorInfo {
1274                                                    actor_id,
1275                                                    host: Some(host.clone()),
1276                                                    // we assume that we only scale the partial graph of database
1277                                                    partial_graph_id: to_partial_graph_id(
1278                                                        database_id,
1279                                                        None,
1280                                                    ),
1281                                                })
1282                                            })
1283                                            .collect(),
1284                                        removed_upstream_actor_id: reschedule
1285                                            .removed_actors
1286                                            .iter()
1287                                            .cloned()
1288                                            .collect(),
1289                                    },
1290                                )
1291                                .unwrap();
1292                        }
1293                    }
1294                }
1295                let merge_update = merge_update.into_values().collect();
1296
1297                let mut actor_vnode_bitmap_update = HashMap::new();
1298                for reschedule in reschedules.values() {
1299                    // Record updates for all actors in this fragment.
1300                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1301                        let bitmap = bitmap.to_protobuf();
1302                        actor_vnode_bitmap_update
1303                            .try_insert(actor_id, bitmap)
1304                            .unwrap();
1305                    }
1306                }
1307                let dropped_actors = reschedules
1308                    .values()
1309                    .flat_map(|r| r.removed_actors.iter().copied())
1310                    .collect();
1311                let mut actor_splits = HashMap::new();
1312                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1313                for (fragment_id, reschedule) in reschedules {
1314                    for (actor_id, splits) in &reschedule.actor_splits {
1315                        actor_splits.insert(
1316                            *actor_id,
1317                            ConnectorSplits {
1318                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1319                            },
1320                        );
1321                    }
1322
1323                    if let Some(assignment) =
1324                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1325                    {
1326                        actor_cdc_table_snapshot_splits.extend(assignment)
1327                    }
1328                }
1329
1330                // we don't create dispatchers in reschedule scenario
1331                let actor_new_dispatchers = HashMap::new();
1332                let mutation = Mutation::Update(UpdateMutation {
1333                    dispatcher_update,
1334                    merge_update,
1335                    actor_vnode_bitmap_update,
1336                    dropped_actors,
1337                    actor_splits,
1338                    actor_new_dispatchers,
1339                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1340                        splits: actor_cdc_table_snapshot_splits,
1341                    }),
1342                    sink_schema_change: Default::default(),
1343                    subscriptions_to_drop: vec![],
1344                });
1345                tracing::debug!("update mutation: {mutation:?}");
1346                Some(mutation)
1347            }
1348
1349            Command::CreateSubscription {
1350                upstream_mv_table_id,
1351                subscription_id,
1352                ..
1353            } => Some(Mutation::Add(AddMutation {
1354                actor_dispatchers: Default::default(),
1355                added_actors: vec![],
1356                actor_splits: Default::default(),
1357                pause: false,
1358                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1359                    upstream_mv_table_id: *upstream_mv_table_id,
1360                    subscriber_id: subscription_id.as_subscriber_id(),
1361                }],
1362                backfill_nodes_to_pause: vec![],
1363                actor_cdc_table_snapshot_splits: None,
1364                new_upstream_sinks: Default::default(),
1365            })),
1366            Command::DropSubscription {
1367                upstream_mv_table_id,
1368                subscription_id,
1369            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1370                info: vec![SubscriptionUpstreamInfo {
1371                    subscriber_id: subscription_id.as_subscriber_id(),
1372                    upstream_mv_table_id: *upstream_mv_table_id,
1373                }],
1374            })),
1375            Command::ConnectorPropsChange(config) => {
1376                let mut connector_props_infos = HashMap::default();
1377                for (k, v) in config {
1378                    connector_props_infos.insert(
1379                        k.as_raw_id(),
1380                        ConnectorPropsInfo {
1381                            connector_props_info: v.clone(),
1382                        },
1383                    );
1384                }
1385                Some(Mutation::ConnectorPropsChange(
1386                    ConnectorPropsChangeMutation {
1387                        connector_props_infos,
1388                    },
1389                ))
1390            }
1391            Command::Refresh {
1392                table_id,
1393                associated_source_id,
1394            } => Some(Mutation::RefreshStart(
1395                risingwave_pb::stream_plan::RefreshStartMutation {
1396                    table_id: *table_id,
1397                    associated_source_id: *associated_source_id,
1398                },
1399            )),
1400            Command::ListFinish {
1401                table_id: _,
1402                associated_source_id,
1403            } => Some(Mutation::ListFinish(ListFinishMutation {
1404                associated_source_id: *associated_source_id,
1405            })),
1406            Command::LoadFinish {
1407                table_id: _,
1408                associated_source_id,
1409            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1410                associated_source_id: *associated_source_id,
1411            })),
1412            Command::ResetSource { source_id } => Some(Mutation::ResetSource(
1413                risingwave_pb::stream_plan::ResetSourceMutation {
1414                    source_id: source_id.as_raw_id(),
1415                },
1416            )),
1417            Command::ResumeBackfill { target } => {
1418                let fragment_ids: HashSet<_> = match target {
1419                    ResumeBackfillTarget::Job(job_id) => {
1420                        database_info.backfill_fragment_ids_for_job(*job_id)?
1421                    }
1422                    ResumeBackfillTarget::Fragment(fragment_id) => {
1423                        if !database_info.is_backfill_fragment(*fragment_id)? {
1424                            return Err(MetaError::invalid_parameter(format!(
1425                                "fragment {} is not a backfill node",
1426                                fragment_id
1427                            )));
1428                        }
1429                        HashSet::from([*fragment_id])
1430                    }
1431                };
1432                if fragment_ids.is_empty() {
1433                    warn!(
1434                        ?target,
1435                        "resume backfill command ignored because no backfill fragments found"
1436                    );
1437                    None
1438                } else {
1439                    Some(Mutation::StartFragmentBackfill(
1440                        StartFragmentBackfillMutation {
1441                            fragment_ids: fragment_ids.into_iter().collect(),
1442                        },
1443                    ))
1444                }
1445            }
1446        };
1447        Ok(mutation)
1448    }
1449
1450    pub(super) fn actors_to_create(
1451        &self,
1452        database_info: &InflightDatabaseInfo,
1453        edges: &mut Option<FragmentEdgeBuildResult>,
1454        control_stream_manager: &ControlStreamManager,
1455    ) -> Option<StreamJobActorsToCreate> {
1456        match self {
1457            Command::CreateStreamingJob { info, job_type, .. } => {
1458                if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1459                    // for snapshot backfill, the actors to create is measured separately
1460                    return None;
1461                }
1462                let actors_to_create = info.stream_job_fragments.actors_to_create();
1463                let edges = edges.as_mut().expect("should exist");
1464                Some(edges.collect_actors_to_create(actors_to_create.map(
1465                    |(fragment_id, node, actors)| {
1466                        (
1467                            fragment_id,
1468                            node,
1469                            actors,
1470                            [], // no subscriber for new job to create
1471                        )
1472                    },
1473                )))
1474            }
1475            Command::RescheduleFragment {
1476                reschedules,
1477                fragment_actors,
1478                ..
1479            } => {
1480                // we assume that we only scale the actors in database partial graph
1481                let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1482                    reschedules.iter().map(|(fragment_id, reschedule)| {
1483                        (
1484                            *fragment_id,
1485                            reschedule.newly_created_actors.values().map(
1486                                |((actor, dispatchers), _)| {
1487                                    (actor.actor_id, dispatchers.as_slice())
1488                                },
1489                            ),
1490                        )
1491                    }),
1492                    Some((reschedules, fragment_actors)),
1493                    database_info,
1494                    control_stream_manager,
1495                );
1496                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1497                for (fragment_id, (actor, dispatchers), worker_id) in
1498                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1499                        reschedule
1500                            .newly_created_actors
1501                            .values()
1502                            .map(|(actors, status)| (*fragment_id, actors, status))
1503                    })
1504                {
1505                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1506                    map.entry(*worker_id)
1507                        .or_default()
1508                        .entry(fragment_id)
1509                        .or_insert_with(|| {
1510                            let node = database_info.fragment(fragment_id).nodes.clone();
1511                            let subscribers =
1512                                database_info.fragment_subscribers(fragment_id).collect();
1513                            (node, vec![], subscribers)
1514                        })
1515                        .1
1516                        .push((actor.clone(), upstreams, dispatchers.clone()));
1517                }
1518                Some(map)
1519            }
1520            Command::ReplaceStreamJob(replace_table) => {
1521                let edges = edges.as_mut().expect("should exist");
1522                let mut actors = edges.collect_actors_to_create(
1523                    replace_table.new_fragments.actors_to_create().map(
1524                        |(fragment_id, node, actors)| {
1525                            (
1526                                fragment_id,
1527                                node,
1528                                actors,
1529                                database_info
1530                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1531                            )
1532                        },
1533                    ),
1534                );
1535                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1536                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1537                        (
1538                            sink.new_fragment.fragment_id,
1539                            &sink.new_fragment.nodes,
1540                            sink.new_fragment.actors.iter().map(|actor| {
1541                                (
1542                                    actor,
1543                                    sink.actor_status[&actor.actor_id]
1544                                        .location
1545                                        .as_ref()
1546                                        .unwrap()
1547                                        .worker_node_id,
1548                                )
1549                            }),
1550                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1551                        )
1552                    }));
1553                    for (worker_id, fragment_actors) in sink_actors {
1554                        actors.entry(worker_id).or_default().extend(fragment_actors);
1555                    }
1556                }
1557                Some(actors)
1558            }
1559            _ => None,
1560        }
1561    }
1562
1563    fn generate_update_mutation_for_replace_table(
1564        dropped_actors: impl IntoIterator<Item = ActorId>,
1565        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1566        dispatchers: FragmentActorDispatchers,
1567        init_split_assignment: &SplitAssignment,
1568        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1569        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1570    ) -> Option<Mutation> {
1571        let dropped_actors = dropped_actors.into_iter().collect();
1572
1573        let actor_new_dispatchers = dispatchers
1574            .into_values()
1575            .flatten()
1576            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1577            .collect();
1578
1579        let actor_splits = init_split_assignment
1580            .values()
1581            .flat_map(build_actor_connector_splits)
1582            .collect();
1583        Some(Mutation::Update(UpdateMutation {
1584            actor_new_dispatchers,
1585            merge_update: merge_updates.into_values().flatten().collect(),
1586            dropped_actors,
1587            actor_splits,
1588            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1589            sink_schema_change: auto_refresh_schema_sinks
1590                .as_ref()
1591                .into_iter()
1592                .flat_map(|sinks| {
1593                    sinks.iter().map(|sink| {
1594                        (
1595                            sink.original_sink.id.as_raw_id(),
1596                            PbSinkSchemaChange {
1597                                original_schema: sink
1598                                    .original_sink
1599                                    .columns
1600                                    .iter()
1601                                    .map(|col| PbField {
1602                                        data_type: Some(
1603                                            col.column_desc
1604                                                .as_ref()
1605                                                .unwrap()
1606                                                .column_type
1607                                                .as_ref()
1608                                                .unwrap()
1609                                                .clone(),
1610                                        ),
1611                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1612                                    })
1613                                    .collect(),
1614                                op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1615                                    fields: sink
1616                                        .newly_add_fields
1617                                        .iter()
1618                                        .map(|field| field.to_prost())
1619                                        .collect(),
1620                                })),
1621                            },
1622                        )
1623                    })
1624                })
1625                .collect(),
1626            ..Default::default()
1627        }))
1628    }
1629}
1630
1631impl Command {
1632    #[expect(clippy::type_complexity)]
1633    pub(super) fn collect_database_partial_graph_actor_upstreams(
1634        actor_dispatchers: impl Iterator<
1635            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1636        >,
1637        reschedule_dispatcher_update: Option<(
1638            &HashMap<FragmentId, Reschedule>,
1639            &HashMap<FragmentId, HashSet<ActorId>>,
1640        )>,
1641        database_info: &InflightDatabaseInfo,
1642        control_stream_manager: &ControlStreamManager,
1643    ) -> HashMap<ActorId, ActorUpstreams> {
1644        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1645        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1646            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1647            for (upstream_actor_id, dispatchers) in upstream_actors {
1648                let upstream_actor_location =
1649                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1650                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1651                for downstream_actor_id in dispatchers
1652                    .iter()
1653                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1654                {
1655                    actor_upstreams
1656                        .entry(*downstream_actor_id)
1657                        .or_default()
1658                        .entry(upstream_fragment_id)
1659                        .or_default()
1660                        .insert(
1661                            upstream_actor_id,
1662                            PbActorInfo {
1663                                actor_id: upstream_actor_id,
1664                                host: Some(upstream_actor_host.clone()),
1665                                partial_graph_id: to_partial_graph_id(
1666                                    database_info.database_id,
1667                                    None,
1668                                ),
1669                            },
1670                        );
1671                }
1672            }
1673        }
1674        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1675            for reschedule in reschedules.values() {
1676                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1677                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1678                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1679                    for upstream_actor_id in fragment_actors
1680                        .get(upstream_fragment_id)
1681                        .expect("should exist")
1682                    {
1683                        let upstream_actor_location =
1684                            upstream_fragment.actors[upstream_actor_id].worker_id;
1685                        let upstream_actor_host =
1686                            control_stream_manager.host_addr(upstream_actor_location);
1687                        if let Some(upstream_reschedule) = upstream_reschedule
1688                            && upstream_reschedule
1689                                .removed_actors
1690                                .contains(upstream_actor_id)
1691                        {
1692                            continue;
1693                        }
1694                        for (_, downstream_actor_id) in
1695                            reschedule
1696                                .added_actors
1697                                .iter()
1698                                .flat_map(|(worker_id, actors)| {
1699                                    actors.iter().map(|actor| (*worker_id, *actor))
1700                                })
1701                        {
1702                            actor_upstreams
1703                                .entry(downstream_actor_id)
1704                                .or_default()
1705                                .entry(*upstream_fragment_id)
1706                                .or_default()
1707                                .insert(
1708                                    *upstream_actor_id,
1709                                    PbActorInfo {
1710                                        actor_id: *upstream_actor_id,
1711                                        host: Some(upstream_actor_host.clone()),
1712                                        partial_graph_id: to_partial_graph_id(
1713                                            database_info.database_id,
1714                                            None,
1715                                        ),
1716                                    },
1717                                );
1718                        }
1719                    }
1720                }
1721            }
1722        }
1723        actor_upstreams
1724    }
1725}